RabbitMQ最新入门教程_rabbitmq教程
文章目录
- RabbitMQ最新入门教程
-
- 1.什么是消息队列
- 2.为什么使用消息队列
- 3.消息队列协议
- 4.安装Erlang
- 5.安装RabbitMQ
- 6.RabbitMQ核心模块
- 7.RabbitMQ六大模式
-
- 7.1 简单模式
- 7.2 工作模式
- 7.3 发布订阅模式
- 7.4 路由模式
- 7.5 主题模式
- 7.6 RPC模式
- 8.RabbitMQ四种交换机
-
- 8.1 直连交换机
- 8.2 主题交换机
- 8.3 扇形交换机
- 8.4 首部交换机
- 9.RabbitMQ工作原理
- 10.RabbitMQ Management使用
-
- 10.1 Queues操作
- 10.2 Exchanges操作
- 11.在Java中使用RabbitMQ
- 12.SpringBoot整合RabbitMQ
- 参考
RabbitMQ最新入门教程
1.什么是消息队列
消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
常用的 MQ 有 ActiveMQ、RabbitMQ、Kafka、RocketMQ。
2.为什么使用消息队列
-
解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。
-
异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。
-
削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
3.消息队列协议
常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议。RabbitMQ 实现的两个核心协议:AMQP 1.0 和 AMQP 0-9-1。
4.安装Erlang
-
安装 Erlang。RabbitMQ 服务器是用 Erlang 语言编写的,它的安装包里并没有集成 Erlang 的环境,因此需要先安装 Erlang。下载链接,傻瓜式安装,一直点next就行。
-
为 Erlang 配置环境变量。「编辑系统环境变量」->「环境变量」->「系统变量」->「path」->「编辑」->「新建」,填入 Erlang 的 bin 路径(
D:\\Erlang\\Erlang OTP\\bin
)。 -
验证是否安装成功。「cmd」->「输入
erl -version
」
5.安装RabbitMQ
-
安装 RabbitMQ 服务器端,下载链接,傻瓜式安装,一直点 next 就行。
-
安装完成后,进入安装目录(
sbin
目录下),运行 cmd,输入rabbitmqctl.bat status
可确认 RabbitMQ 的启动状态。 -
输入
rabbitmq-plugins enable rabbitmq_management
,启用客户端管理 UI 的插件。 -
在浏览器地址栏输入 http://localhost:15672/ 进入管理端界面。账号与密码默认是:guest/guest。出现如下界面,则表示安装成功完成。
note:在通过 rabbitmqctl.bat status
查看 RabbitMQ 的启动状态时,出现了错误。Error: unable to perform an operation on node \'rabbit@WIN-H34GJTET6NT\'. Please see diagnostics information and suggestions below.
这个错误是因为 erlang 和 rabbitmq 的 .erlang.cookie
不同步所导致的。
解决办法:将 C:\\Users\\Administrator\\.erlang.cookie
复制到 C:\\Windows\\System32\\config\\systemprofile
目录,重启 rabbitMQ 服务。
6.RabbitMQ核心模块
- Publisher:生产者,发送消息给交换机。
- consumer:消费者,消费消息,和队列进行绑定。
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。交换机只能路由消息,无法存储消息。交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定。
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理。
- Virtual Host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离。
- Broker:就是 RabbitMQ 服务,用于接收和分发消息,接受客户端的连接,实现 AMQP 实体服务。
- Connection:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
- Channel:网络信道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立连接的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由 Properties 和 Body 组成。Properties 为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body 就是消息体内容。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个 RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
7.RabbitMQ六大模式
7.1 简单模式
简单模式是最基本的工作模式,也是最简单的消息传递模式。在简单模式中,一个生产者将消息发到一个队列中,一个消费者从队列中获取并消费消息。这种模式适用于单个生产者和单个消费者的简单场景,消息的处理是同步的。
7.2 工作模式
工作模式用于实现一个任务在多个消费者之间的并发处理,在工作队列模式中,一个生产者将消息发到一个队列中,多个消费者从队列中获取并处理消息,每个消息只能被一个消费者处理。这种模式适用于多个消费者并发处理消息的情况,提高了系统的处理能力和吞吐量。
7.3 发布订阅模式
发布/订阅模式用于实现一条消息被多个消费者同时接受和处理。在发布/订阅模式中,一个生产者将消息发送到交换器(Exchange)中,交换器将消息广播到所有绑定的队列,每个队列对应一个消费者。这种模式适用于消息需要被多个消费者同时接受和处理的广播场景,如日志订阅和消息通知等。
7.4 路由模式
路由模式用于实现根据消息的路由键(Routing Key)将消息路由到不同的队列中。在路由模式中,一个生产者将消息发送到交换器中,并制定消息的路由键,交换器根据路由键将消息路由到与之匹配的队列中。这种模式适用于根据不同的条件将消息分发到不同的队列中,以实现消息的筛选和分发。
7.5 主题模式
主题模式是一种更灵活的消息路由,它使用通配符匹配路由键,将消息路由到多个队列中。在主题模式中,一个生产者将消息发送到交换器中,并指定主题(Topic)作为路由键,交换器根据通配符匹配将消息路由到与之匹配的队列中。
7.6 RPC模式
RPC模式是一种用于实现分布式系统中远程调用的工作模式。指的是通过RabbitMQ来实现一种RPC的能力。
8.RabbitMQ四种交换机
生产者发布消息、消费者接收消息,但是这中间的消息是怎么传递的,就用到了一个很重要的概念交换机(Exchange),RabbitMQ 消息投递到交换机上之后,通过路由关系再投递到指定的一个或多个队列上。
Exchange 参数介绍:
- Name:交换机名称
- Type:交换机类型 direct、topic、fanout、headers
- Durability:是否需要持久化
- Auto Delete:最后一个绑定到 Exchange 上的队列删除之后自动删除该 Exchange
- Internal:当前 Exchange 是否应用于 RabbitMQ 内部使用,默认false。
- Arguments:扩展参数
Exchange 四种类型:
- direct:不需要 Exchange 进行绑定,根据 RoutingKey 匹配消息路由到指定的队列。
- topic:生产者指定 RoutingKey 消息根据消费端指定的队列通过模糊匹配的方式进行相应转发,两种通配符模式:
#
:可匹配一个或多个关键字*
:只能匹配一个关键字
- fanout:这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。
- headers:根据发送消息内容中的 headers 属性来匹配
8.1 直连交换机
direct 通过 RoutingKey 匹配消息路由到指定的队列,因此也可以无需指定交换机,在不指定交换机的情况下会使用 AMQP default
默认的交换机,另外在消息投递时要注意 RoutingKey 要完全匹配才能被队列所接收,否则消息会被丢弃的。
上图三个队列,第一个队列的 Binding routingKey 为 black,第二个队列和第三个队列的 Binding routingKey 为 green,也很清晰的能看到消息投递 1 仅被 Queue1 接收,而 消息投递 2 同时可以被广播到 Queue2 和 Queue3,这是因为 Queue2 和 Queue3 的路由键是相同的,再一次的说明了交换机的 direct 模式是通过 RoutingKey 进行消息路由的。
8.2 主题交换机
生产者指定 RoutingKey ,消费端根据指定的队列通过模糊匹配的方式进行相应转发。
上图展示了交换机 Topic 模式的消息流转过程,Queue1 的路由键通过使用 \\*
符合匹配到了 black.test1 和 black.test2 但是 black.test3.1 由于有多个关键字是匹配不到的。另一个队列 Queue2 使用了 #
符号即可以一个也可以匹配多个关键字,同时匹配到了 black.test4 和 black.test5.1。
8.3 扇形交换机
fanout 只需要将队列绑定到交换机上即可,是不需要设置路由键的,便可将消息转发到绑定的队列上,由于不需要路由键,所以 fanout 也是四个交换机类型中最快的一个,如果是做广播模式的就很适合。
8.4 首部交换机
Headers 类型的交换机是根据发送消息内容中的 headers 属性来匹配的,headers 类型的交换机基本上不会用到,因此这里也不会过多介绍,掌握以上三种类型的交换机模型在平常的业务场景中就足够了。
9.RabbitMQ工作原理
AMQP 协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:
- 生产者是连接到 Server,建立一个连接,开启一个信道。
- 生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。
- 消费者也需要进行建立连接,开启信道等操作,便于接收消息。
- 生产者发送消息,发送到服务端中的虚拟主机。
- 虚拟主机中的交换器根据路由键选择路由规则,发送到不同的消息队列中。
- 订阅了消息队列的消费者就可以获取到消息,进行消费。
10.RabbitMQ Management使用
通过 http://localhost:15672 访问 RabbitMQ 的控制台管理工具 RabbitMQ Management,用户名/密码都是 guest。
10.1 Queues操作
-
创建队列。点击「导航栏Queues」->「Add a new queue」展开队列信息,填好信息后,add 添加。
-
添加成功后,可以在 All queues 下看到添加的队列。点击队列名就会跳转到队列详情页面。
-
在队列详情页面,可以进行一些操作,如下图所示。
10.2 Exchanges操作
-
创建队列。点击「导航栏Exchanges」->「Add a new exchange」展开交换机信息,填好信息后,add 添加。
-
在交换机详情页面,同样可以进行一些操作,如下图所示。
11.在Java中使用RabbitMQ
模拟一个最简单的场景,一个生产者发送消息到队列中,一个消费者从队列中读取消息并打印。
-
第一步,在项目中添加 RabbitMQ 客户端依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.22.0</version></dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.16</version></dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.16</version> <type>jar</type></dependency>
note:如果没有导入 slf4j-api 和 slf4j-simple 依赖,会报错 SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation. 如下:
-
第二步,创建生产者类。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;public class Producer { // 声明队列名称,设置为final static表示全局唯一常量 private final static String QUEUE_NAME = \"test\"; public static void main(String[] args) throws IOException, TimeoutException { // 创建RabbitMQ连接工厂实例 ConnectionFactory factory = new ConnectionFactory(); // 使用try-with-resources自动关闭连接和通道 try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()){ // 声明持久化队列(durable=true),若队列不存在则创建 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 定义要发送的消息内容 String message = \"this is a rabbitmq test in java\"; // 发布消息到默认交换机(\"\"),路由到指定队列 channel.basicPublish(\"\", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); // 打印发送成功的消息内容 System.out.println(\"发送消息:\"+ message); } // 资源会在此自动关闭 }}
-
第三步,创建消费者类。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;public class Consumer { // 与生产者保持一致的队列名称 private final static String QUEUE_NAME = \"test\"; public static void main(String[] args) throws IOException, TimeoutException { // 创建RabbitMQ连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 建立TCP连接 Connection connection = factory.newConnection(); // 创建AMQP通道(轻量级连接) Channel channel = connection.createChannel(); // 声明持久化队列(需与生产者配置一致) channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(\"等待接收消息\"); // 定义消息回调函数,处理接收到的消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 将字节数组转换为UTF-8字符串 String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(\"接收消息:\" + message); }; // 启动消费者并设置自动确认模式(autoAck=true) channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { // 消费者取消时的回调(空实现) }); }}
-
第四步,运行,查看效果。
12.SpringBoot整合RabbitMQ
-
新建一个 SpringBoot 项目。
-
修改
pom.xml
文件,添加 RabbitMQ 依赖。<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd\"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.9</version> <relativePath/> </parent> <groupId>org.example</groupId> <artifactId>RabbitMQSpring</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQSpring</name> <description>RabbitMQSpring</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
-
将配置文件
application.properties
重命名为application.yml
,并修改配置文件的内容。spring: application: name: RabbitMQSpring rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /server: port: 8088logging: level: org.example.rabbitmqspring: debug
-
在项目根目录下创建 config 包,在 config 包下创建一个 RabbitMQ 配置类 RabbitMQConfig。
package org.example.rabbitmqspring.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { // 交换机的名称 public static final String DEFAULT_EXCHANGE = \"exchange\"; // 路由Key的名称 public static final String DEFAULT_ROUTE = \"route\"; // 队列的名称 public static final String DEFAULT_QUEUE = \"queue\"; /** * 声明交换机 * @return DirectExchange */ @Bean public DirectExchange exchange() { return new DirectExchange(DEFAULT_EXCHANGE); } /** * 声明队列 * @return Queue */ @Bean public Queue queue(){ return new Queue(DEFAULT_QUEUE); } /** * 声明路由Key(交换机和队列的关系) * @param exchange DirectExchange * @param queue Queue * @return Binding */ @Bean public Binding binding(DirectExchange exchange, Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(DEFAULT_ROUTE); }}
-
在项目根目录下创建 producer 包,在 producer 包下创建一个 RabbitProducer 类。
package org.example.rabbitmqspring.producer;import org.example.rabbitmqspring.config.RabbitMQConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * RabbitMQ消息生产者组件,负责将消息发送到指定队列 * 基于Spring AMQP的RabbitTemplate实现消息发送 */@Component // 声明为Spring管理的组件,会自动扫描并注册到应用上下文中public class RabbitProducer { private final RabbitTemplate rabbitTemplate; // RabbitMQ操作模板,用于发送和接收消息 @Autowired // 通过构造器注入RabbitTemplate实例 public RabbitProducer(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; } /** * 发送消息到默认交换机和路由 * @param message 待发送的消息内容 */ public void sendMessage(String message){ // 调用RabbitTemplate将消息转换并发送到指定交换机和路由键 rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTE, message); }}
-
在项目根目录下创建 consumer 包,在 consumer 包下创建一个 RabbitConsumer 类。
package org.example.rabbitmqspring.consumer;import org.example.rabbitmqspring.config.RabbitMQConfig;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * RabbitMQ消息消费者组件,自动监听配置的默认队列 * 基于Spring AMQP的注解驱动模型实现消息监听 */@Component // 声明为Spring组件,由Spring容器管理实例化@RabbitListener(queues = RabbitMQConfig.DEFAULT_QUEUE) // 监听配置类中定义的默认队列public class RabbitMQConsumer { /** * 消息处理方法,当队列接收到新消息时自动触发 * @param message 从队列中获取的字符串消息体 */ @RabbitHandler // 声明为RabbitMQ消息处理方法 public void receive(String message){ System.out.printf(\"收到一条消息: %s\", message); }}
-
在项目根目录下创建 controller 包,在 controller 包下创建一个 MessageController 类。
package org.example.rabbitmqspring.controller;import org.example.rabbitmqspring.producer.RabbitProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;@RestController // 声明该类为RESTful风格的控制器,会自动将返回值转换为JSON格式@RequestMapping(path = \"/message\", produces = \"application/json;charset=utf-8\") // 映射请求路径,设置响应内容类型为JSON且字符集为UTF-8public class MessageController { private final RabbitProducer producer; // 依赖注入RabbitMQ消息生产者 @Autowired // 自动注入RabbitProducer实例 public MessageController(RabbitProducer producer){ this.producer = producer; } @RequestMapping(value = \"/send\", method = RequestMethod.POST) // 映射/send路径的POST请求 public void sendMessage(String message){ // 处理发送消息的请求,接收一个字符串类型的消息 producer.sendMessage(message); // 调用生产者的发送消息方法 }}
-
使用 API 测试工具,测试发送消息。
测试工具推荐:Postman、Apifox。
由于 Postman 需要访问外网才能使用,不支持中文等使用门槛,对于国内开发者来说并不是一个最好的首选 API 管理工具,所以 Apifox 会更适合我们使用。
🤗🤗🤗
参考
- 超详细的RabbitMQ入门,看这篇就够了!
- 图文实践 RabbitMQ 不同类型交换机消息投递机制
- RabbitMQ的介绍
- RabbitMQ安装和使用详细教程