微服务消息队列之——RabbitMQ
本篇文章是基于黑马微服务的课程学习,一些自己理解和看法。让大家对RabbitMQ有一个基本的了解。
1.什么是RabbitMQ
微服务之间的调用是采用OpenFeign,而OpenFeign又是同步调用,可以清晰看下面的例子:
我们执行图片这样一个业务,我们需要在支付服务完成三步:
- 调用用户服务,执行扣减余额
- 更新支付服务的交易流水
- 调用交易服务,更新订单的状态
第一步和第二步操作需要保证为原子操作
完成三步后,整个支付服务才算完成。这里我们需要思考:支付服务是否需要交易服务反馈过来结果?我们可以发现是不需要的。因此,我们可以采用异步调用的方式去执行交易服务。那么支付服务和交易服务之间媒介,由谁负责——消息队列。
1.1 同步调用会导致的问题
- 拓展性差
- 倘若我们需要到支付服务中拓展新的服务,比如:短信服务,计分服务。这样需要到业务修改很多代码,不符合OPC开闭原则,拓展性不好。
- 性能下降
- 支付服务里面子服务太多,影响支付的性能
- 级联失败
- 可能由于没有发送成功短信导致,整个支付失败,这是因小失大
1.2 几种常见的消息队列
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
今天我们介绍的是RabbitMQ
1.3 RabbitMQ
1.3.1 核心概念速览
RabbitMQ 是一个开源的“消息代理”(message broker),它实现了 AMQP 0-9-1 协议,也支持 STOMP、MQTT、HTTP 等多种协议,用来在分布式系统或微服务之间做 可靠、异步、解耦 的消息收发。官网地址:https://www.rabbitmq.com/
1.3.2 四种交换机类型(路由策略)
- Direct(直连)
路由键完全匹配才投递。
例:routingKey = \"order.paid\"
→ 只进绑定了该 key 的队列。 - Fanout(广播)
消息发到所有绑定的队列,忽略路由键。 - Topic(主题通配)
路由键支持*
(单级)和#
(多级)。
例:order.*
匹配order.paid
、order.created
。 - Headers(头信息匹配)
根据消息头里的键值对过滤,较少用。
2. 部署RabbitMQ
2.1 在Docker中部署
MQ压缩包下载
先把压缩包放在linux系统中,然后dacker load -f [RabbitMQ压缩包]
的镜像加载到Docker容器中。
然后执行创建容器的命令
docker run \\ -e RABBITMQ_DEFAULT_USER=admin \\ -e RABBITMQ_DEFAULT_PASS=123456 \\ -v mq-plugins:/plugins \\ --name mq \\ --hostname mq \\ -p 15672:15672 \\ -p 5672:5672 \\ --network hm-net\\ -d \\ rabbitmq:3.8-management
部署好了,访问地址:RabbitMQ Management,需要根据自己的虚拟机地址修改前面网络号。
2.2 访问网址
在 RabbitMQ 官方管理网站(Web UI) 中完成“创建队列、交换机、用户隔离”只需三步,全程图形界面操作
第 1 步:创建用户(实现用户隔离)
- 登录管理后台:
http://:15672
- 顶部菜单 → Admin → Add a user
- 填写用户名、密码、角色(如
management
),点击 Add user - 新用户默认 无任何虚拟主机权限,继续下一步。
第 2 步:创建虚拟主机(Vhost)实现数据隔离
- 仍在 Admin 页面 → Virtual Hosts → Add a new virtual host
- 输入名称,如:
/pay_vhost
→ Add virtual host - 点击刚创建的 vhost → Permissions → 给第 1 步的用户 Set permission(赋予读写配置权限)
- 完成后该用户只能看到
/pay_vhost
下的资源,彻底与其他业务隔离。
- 完成后该用户只能看到
第 3 步:在 vhost 内创建交换机和队列
- 右上角 Virtual Host 下拉框切换为
/pay_vhost
- 顶部菜单 → Queues → Add a new queue
- 输入队列名(如
order.queue
)、选择 Durable(持久化)→ Add queue
- 输入队列名(如
- 顶部菜单 → Exchanges → Add a new exchange
- 输入交换机名(如
order.exchange
)、类型(如direct
)、Durable → Add exchange
- 输入交换机名(如
- 进入交换机详情页 → Bindings → 绑定刚创建的队列,填写 Routing key(如
order.paid
)→ Bind
3.SpringAMQP
Spring AMQP 是 Spring 生态中专为 AMQP(高级消息队列协议,如 RabbitMQ)设计的“全家桶”级解决方案。它把 RabbitMQ 的底层 API 包装成 Spring 风格的模板、注解和自动配置,开发者几乎不用写原生 RabbitMQ 代码就能完成消息的发送、接收、路由和治理。
3.1 广播交换机案例实现
简单的案例实现,三个模块:一个生产者,一个消费者,一个交换机
Demo文件下载
包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
3.1.1 依赖安装
注意:JDK版本是11
<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast.demo</groupId> <artifactId>mq-demo</artifactId> <version>1.0-SNAPSHOT</version> <modules> <module>publisher</module> <module>consumer</module> </modules> <packaging>pom</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies></project>
备注:消费者和生产者都要安装这些依赖
3.1.2 配置文件(前提新建一个hmall用户)
spring: rabbitmq: host: 127.0.0.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: hmall # 用户名 password: 123456 # 密码
怎么新建用户,前面已经写了
3.1.3 声明队列和交换机
在控制台创建队列fanout.queue1
和fanout.queue2
然后再创建一个交换机:
然后绑定两个队列到交换机,首先点击那个新建的交换机:
3.1.4 生成者发送消息
到测试类写
package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testExchange() { String exchangeName = \"hmall.fanout\"; // 消息 String message = \"hello, everyone!\"; rabbitTemplate.convertAndSend(exchangeName, \"\", message); }}
3.1.5 消费者接受消息
文件结构
package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;/** * @program: mq-demo * @description: * @author: WangXin * @create: 2025-07-29 19:55 **/@Componentpublic class SpringRabbitListener { @RabbitListener(queues = \"fanout.queue1\") public void listenFanoutQueue1(String msg) { System.out.println(\"消费者1接收到Fanout消息:【\" + msg + \"】\"); } @RabbitListener(queues = \"fanout.queue2\") public void listenFanoutQueue2(String msg) { System.out.println(\"消费者2接收到Fanout消息:【\" + msg + \"】\"); }}
3.1.6 测试
- 先启动测试类
- 再启动消费者
3.2 基于注解实现
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
这里介绍是采用注解的形式创建
3.2.1 Direct模式的交换机
新建一个配置类
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"direct.queue1\"), exchange = @Exchange(name = \"hmall.direct\", type = ExchangeTypes.DIRECT), key = {\"red\", \"blue\"}))public void listenDirectQueue1(String msg){ System.out.println(\"消费者1接收到direct.queue1的消息:【\" + msg + \"】\");}@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"direct.queue2\"), exchange = @Exchange(name = \"hmall.direct\", type = ExchangeTypes.DIRECT), key = {\"red\", \"yellow\"}))public void listenDirectQueue2(String msg){ System.out.println(\"消费者2接收到direct.queue2的消息:【\" + msg + \"】\");}
direct.queue1
,就按默认持久化、非排他、非自动删除的规则创建出来。hmall.direct
交换机,就创建一个 DIRECT 类型 的持久化交换机。direct.queue1
与交换机 hmall.direct
建立 两条绑定:hmall.direct
→ direct.queue1
的 key 分别是 red
和 blue
。