> 技术文档 > 微服务消息队列之——RabbitMQ

微服务消息队列之——RabbitMQ


本篇文章是基于黑马微服务的课程学习,一些自己理解和看法。让大家对RabbitMQ有一个基本的了解。

1.什么是RabbitMQ

微服务之间的调用是采用OpenFeign,而OpenFeign又是同步调用,可以清晰看下面的例子:

微服务消息队列之——RabbitMQ

我们执行图片这样一个业务,我们需要在支付服务完成三步:

  1. 调用用户服务,执行扣减余额
  2. 更新支付服务的交易流水
  3. 调用交易服务,更新订单的状态

第一步和第二步操作需要保证为原子操作

完成三步后,整个支付服务才算完成。这里我们需要思考:支付服务是否需要交易服务反馈过来结果?我们可以发现是不需要的。因此,我们可以采用异步调用的方式去执行交易服务。那么支付服务和交易服务之间媒介,由谁负责——消息队列

1.1 同步调用会导致的问题

  1. 拓展性差
    • 倘若我们需要到支付服务中拓展新的服务,比如:短信服务,计分服务。这样需要到业务修改很多代码,不符合OPC开闭原则,拓展性不好。
  2. 性能下降
    • 支付服务里面子服务太多,影响支付的性能
  3. 级联失败
    • 可能由于没有发送成功短信导致,整个支付失败,这是因小失大

1.2 几种常见的消息队列

RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

今天我们介绍的是RabbitMQ

1.3 RabbitMQ

1.3.1 核心概念速览

RabbitMQ 是一个开源的“消息代理”(message broker),它实现了 AMQP 0-9-1 协议,也支持 STOMP、MQTT、HTTP 等多种协议,用来在分布式系统或微服务之间做 可靠、异步、解耦 的消息收发。官网地址:https://www.rabbitmq.com/

概念 类比/作用 Producer 发信人:把消息发到 Exchange Exchange 分拣员:按规则(路由键、通配符等)把消息分到 Queue Queue 信箱:真正存储消息的地方 Binding 分拣规则:Exchange 与 Queue 的绑定关系 Consumer 收信人:从 Queue 里拿消息 Virtual Host 虚拟邮局:逻辑隔离(多租户) Connection TCP 长连接 Channel 轻量级“会话”,减少 TCP 复用开销
1.3.2 四种交换机类型(路由策略)
  1. Direct(直连)
    路由键完全匹配才投递。
    例:routingKey = \"order.paid\" → 只进绑定了该 key 的队列。
  2. Fanout(广播)
    消息发到所有绑定的队列,忽略路由键。
  3. Topic(主题通配)
    路由键支持 *(单级)和 #(多级)。
    例:order.* 匹配 order.paidorder.created
  4. Headers(头信息匹配)
    根据消息头里的键值对过滤,较少用。

2. 部署RabbitMQ

2.1 在Docker中部署

MQ压缩包下载

先把压缩包放在linux系统中,然后dacker load -f [RabbitMQ压缩包] 的镜像加载到Docker容器中。

然后执行创建容器的命令

docker run \\ -e RABBITMQ_DEFAULT_USER=admin \\ -e RABBITMQ_DEFAULT_PASS=123456 \\ -v mq-plugins:/plugins \\ --name mq \\ --hostname mq \\ -p 15672:15672 \\ -p 5672:5672 \\ --network hm-net\\ -d \\ rabbitmq:3.8-management

部署好了,访问地址:RabbitMQ Management,需要根据自己的虚拟机地址修改前面网络号。

2.2 访问网址

微服务消息队列之——RabbitMQ

RabbitMQ 官方管理网站(Web UI) 中完成“创建队列、交换机、用户隔离”只需三步,全程图形界面操作

第 1 步:创建用户(实现用户隔离)

  1. 登录管理后台:http://:15672
  2. 顶部菜单 → AdminAdd a user
  3. 填写用户名、密码、角色(如 management),点击 Add user
  4. 新用户默认 无任何虚拟主机权限,继续下一步。

第 2 步:创建虚拟主机(Vhost)实现数据隔离

  1. 仍在 Admin 页面 → Virtual HostsAdd a new virtual host
  2. 输入名称,如:/pay_vhostAdd virtual host
  3. 点击刚创建的 vhost → Permissions → 给第 1 步的用户 Set permission(赋予读写配置权限)
    • 完成后该用户只能看到 /pay_vhost 下的资源,彻底与其他业务隔离。

第 3 步:在 vhost 内创建交换机和队列

  1. 右上角 Virtual Host 下拉框切换为 /pay_vhost
  2. 顶部菜单 → QueuesAdd a new queue
    • 输入队列名(如 order.queue)、选择 Durable(持久化)→ Add queue
  3. 顶部菜单 → ExchangesAdd a new exchange
    • 输入交换机名(如 order.exchange)、类型(如 direct)、DurableAdd exchange
  4. 进入交换机详情页 → Bindings → 绑定刚创建的队列,填写 Routing key(如 order.paid)→ Bind

3.SpringAMQP

Spring AMQP 是 Spring 生态中专为 AMQP(高级消息队列协议,如 RabbitMQ)设计的“全家桶”级解决方案。它把 RabbitMQ 的底层 API 包装成 Spring 风格的模板、注解和自动配置,开发者几乎不用写原生 RabbitMQ 代码就能完成消息的发送、接收、路由和治理。

3.1 广播交换机案例实现

微服务消息队列之——RabbitMQ

简单的案例实现,三个模块:一个生产者,一个消费者,一个交换机

Demo文件下载

微服务消息队列之——RabbitMQ

微服务消息队列之——RabbitMQ

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
3.1.1 依赖安装

注意:JDK版本是11

<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast.demo</groupId> <artifactId>mq-demo</artifactId> <version>1.0-SNAPSHOT</version> <modules> <module>publisher</module> <module>consumer</module> </modules> <packaging>pom</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies></project>

备注:消费者和生产者都要安装这些依赖

3.1.2 配置文件(前提新建一个hmall用户)
spring: rabbitmq: host: 127.0.0.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: hmall # 用户名 password: 123456 # 密码

怎么新建用户,前面已经写了

3.1.3 声明队列和交换机

在控制台创建队列fanout.queue1fanout.queue2

微服务消息队列之——RabbitMQ

然后再创建一个交换机:

微服务消息队列之——RabbitMQ

然后绑定两个队列到交换机,首先点击那个新建的交换机:

微服务消息队列之——RabbitMQ

微服务消息队列之——RabbitMQ

3.1.4 生成者发送消息

到测试类写

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testExchange() { String exchangeName = \"hmall.fanout\"; // 消息 String message = \"hello, everyone!\"; rabbitTemplate.convertAndSend(exchangeName, \"\", message); }}
3.1.5 消费者接受消息

文件结构
微服务消息队列之——RabbitMQ

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;/** * @program: mq-demo * @description: * @author: WangXin * @create: 2025-07-29 19:55 **/@Componentpublic class SpringRabbitListener { @RabbitListener(queues = \"fanout.queue1\") public void listenFanoutQueue1(String msg) { System.out.println(\"消费者1接收到Fanout消息:【\" + msg + \"】\"); } @RabbitListener(queues = \"fanout.queue2\") public void listenFanoutQueue2(String msg) { System.out.println(\"消费者2接收到Fanout消息:【\" + msg + \"】\"); }}
3.1.6 测试
  • 先启动测试类
  • 再启动消费者

微服务消息队列之——RabbitMQ

3.2 基于注解实现

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

这里介绍是采用注解的形式创建

3.2.1 Direct模式的交换机

新建一个配置类

@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"direct.queue1\"), exchange = @Exchange(name = \"hmall.direct\", type = ExchangeTypes.DIRECT), key = {\"red\", \"blue\"}))public void listenDirectQueue1(String msg){ System.out.println(\"消费者1接收到direct.queue1的消息:【\" + msg + \"】\");}@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"direct.queue2\"), exchange = @Exchange(name = \"hmall.direct\", type = ExchangeTypes.DIRECT), key = {\"red\", \"yellow\"}))public void listenDirectQueue2(String msg){ System.out.println(\"消费者2接收到direct.queue2的消息:【\" + msg + \"】\");}
注解/属性 含义 运行时效果 @RabbitListener 声明一个“消息监听器” Spring 容器启动时会自动注册一个监听器容器(Listener Container),负责从 RabbitMQ 拉消息并调用当前方法。 bindings = @QueueBinding(…) 一次性声明“队列-交换机-绑定”三者关系 在监听器启动之前,Spring AMQP 会检查 RabbitMQ 中是否存在这些对象; 不存在则自动创建@Queue(name = “direct.queue1”) 定义/引用队列 如果服务器上没有 direct.queue1,就按默认持久化、非排他、非自动删除的规则创建出来。 @Exchange(name = “hmall.direct”, type = ExchangeTypes.DIRECT) 定义/引用交换机 如果没有 hmall.direct 交换机,就创建一个 DIRECT 类型 的持久化交换机。 key = {“red”, “blue”} 指定路由键(Routing Key) 把队列 direct.queue1 与交换机 hmall.direct 建立 两条绑定
hmall.directdirect.queue1 的 key 分别是 redblue