> 技术文档 > RabbitMQ简述

RabbitMQ简述


RabbitMQ简述

RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持异步通信、解耦服务、负载均衡和消息缓冲。

核心概念

Producer(生产者):发送消息的应用。
Consumer(消费者):接收消息的应用。
Queue(队列):存储消息的缓冲区,遵循 FIFO(先进先出)。
Exchange(交换机):接收生产者消息并路由到队列(根据规则)。
Binding(绑定):定义交换机和队列之间的关联规则。
Message(消息):包含有效载荷(数据)和元数据(如路由键、头信息)。

交换机类型(Routing Strategies)

直连交换机(Direct Exchange)

Direct:精确匹配路由键(如点对点通信)。

  • 根据消息的routing key精确匹配队列
  • 常用于单播(unicast)消息路由
  • 典型应用场景:订单处理(不同订单类型路由到不同队列)

扇形交换机(Fanout Exchange)

Fanout:广播到所有绑定的队列(发布/订阅模式)。

  • 将消息广播到所有绑定的队列
  • 忽略routing key
  • 典型应用场景:广播通知、事件发布

主题交换机(Topic Exchange)

Topic:基于通配符匹配路由键(灵活的路由)。

  • 根据通配符匹配routing key
  • 支持*(匹配一个单词)和#(匹配零个或多个单词)
  • 典型应用场景:基于多维度路由(如日志级别.应用名称)

头交换机(Headers Exchange)

Headers:通过消息头属性路由(而非路由键)。

  • 根据消息头(header)属性匹配
  • 忽略routing key
  • 支持x-match参数(all需全部匹配,any只需匹配一个)

交换机属性

创建交换机时可设置以下主要属性:
Name:交换机名称
Type:交换机类型(direct, fanout, topic, headers)
Durability:是否持久化(重启后是否保留)
Auto-delete:当所有队列都解除绑定后是否自动删除
Arguments:额外参数(如alternate-exchange等)

模式

模式 交换机类型 核心机制 典型应用场景 简单模式 默认交换机 直接队列绑定 单任务异步处理 工作队列 默认交换机 多消费者竞争 并行任务处理 发布/订阅 Fanout 广播到所有队列 事件通知 路由模式 Direct 精确匹配路由键 选择性日志分发 主题模式 Topic 通配符匹配路由键 多维度消息分类 RPC 默认交换机 回调队列+关联ID 同步远程调用 头部交换机 Headers 键值对匹配 复杂条件路由 死信队列 任意类型(DLX) TTL/拒绝触发 异常消息处理 延迟队列 Delayed Message插件 延迟投递 定时任务/超时控制

简单模式

RabbitMQ简述
简单队列不介绍,直接看工作队列

工作队列

RabbitMQ简述

创建队列

创建一个名为xiri.queue的队列

RabbitMQ简述

消费者代码

模拟2个消费者互相抢消息

@Componentpublic class SpringRabbitListener {@RabbitListener(queues = {\"xiri.queue\"})public void listener1(String mes){System.out.println(\"消费者1接受消息:\"+mes);}@RabbitListener(queues = {\"xiri.queue\"})public void listener2(String mes){System.out.println(\"消费者2接受消息:\"+mes);}}

生产者代码

模拟50条消息

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid WorkQueueSent(){//队列名称String queueName = \"xiri.queue\";for (int i = 1; i <= 50; i++) {//发送消息rabbitTemplate.convertAndSend(queueName,\"消息-\"+i);}}}

运行结果

由此发现默认情况下,是轮询投递消息,并没有考虑到消费者已经处理完了消息,造成消息堆积
RabbitMQ简述

消息堆积处理方案(能者多劳)

设置每次只能给消费者投递1次消息,处理完成后才能获取下一个消息

  1. 修改yml配置文件
spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 virtual-host: /xiri #虚拟主机 username: xiri #账号 password: 123 #密码 listener: simple: prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
  1. 修改消费者代码
    给代码加上等待时间进行模拟测试
@Componentpublic class SpringRabbitListener {@RabbitListener(queues = {\"xiri.queue\"})public void listener1(String mes) throws InterruptedException {System.out.println(\"消费者1接受消息:\"+mes);Thread.sleep(20);}@RabbitListener(queues = {\"xiri.queue\"})public void listener2(String mes) throws InterruptedException {System.out.println(\"消费者2接受消息:\"+mes);Thread.sleep(100);}}
  1. 测试结果
    消费者1处理消息快,处理消息多,实现能者多劳
    RabbitMQ简述

发布/订阅

RabbitMQ简述

控制台设置

设置一个fanout交换机
RabbitMQ简述
设置两个队列
RabbitMQ简述
绑定2个队列
RabbitMQ简述

消费者

在消费者服务写两个消费者方法模拟,分别监听队列1和队列2

@Componentpublic class SpringRabbitListener {@RabbitListener(queues = {\"xiri.queue1\"})public void listener1(String mes) throws InterruptedException {System.out.println(\"消费者1接受消息:\"+mes);}@RabbitListener(queues = {\"xiri.queue2\"})public void listener2(String mes) throws InterruptedException {System.out.println(\"消费者2接受消息:\"+mes);}}

生产者

生产者向交换机发送消息

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.fanout\";//发送消息rabbitTemplate.convertAndSend(exchange,null,\"消息\");//routingKey没有设置,可以为空}}

运行结果

RabbitMQ简述

路由模式

直连交换机(Direct Exchange)会根据规则路由到指定的队列

控制台设置

创建类型为direct,名称为xiri.direct交换机
RabbitMQ简述
创建2个队列,名字分别为direct.queue1、direct.queue2
RabbitMQ简述
进行绑定
direct.queue1绑定key1
direct.queue2绑定key2
RabbitMQ简述

消费者

@Componentpublic class SpringRabbitListener {@RabbitListener(queues = {\"direct.queue1\"})public void listener1(String mes) throws InterruptedException {System.out.println(\"消费者1接受消息:\"+mes);}@RabbitListener(queues = {\"direct.queue2\"})public void listener2(String mes) throws InterruptedException {System.out.println(\"消费者2接受消息:\"+mes);}}

生产者

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.direct\";//发送消息rabbitTemplate.convertAndSend(exchange,\"key1\",\"消息1\");rabbitTemplate.convertAndSend(exchange,\"key1\",\"消息2\");rabbitTemplate.convertAndSend(exchange,\"key2\",\"消息3\");}}

运行结果

RabbitMQ简述

主题模式

直连交换机(Direct Exchange) 和 主题交换机(Topic Exchange)类似,区别在于Routing key可以是多个单词列表以 .(点) 分割

控制台设置

创建类型为topic,名为xiri.topic的交换机
RabbitMQ简述
创建队列
RabbitMQ简述
交换机绑定队列RabbitMQ简述

消费者

@Componentpublic class SpringRabbitListener {@RabbitListener(queues = {\"topic.queue1\"})public void listener1(String mes) throws InterruptedException {System.out.println(\"消费者1接受消息:\"+mes);}@RabbitListener(queues = {\"topic.queue2\"})public void listener2(String mes) throws InterruptedException {System.out.println(\"消费者2接受消息:\"+mes);}}

生产者

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.topic\";//发送消息rabbitTemplate.convertAndSend(exchange,\"topic.key1\",\"消息1\");rabbitTemplate.convertAndSend(exchange,\"topic.key2\",\"消息2\");rabbitTemplate.convertAndSend(exchange,\"topic.key.node1\",\"消息3\");}}

运行结果

根据通配符发到消费者
RabbitMQ简述

注解声明队列和交换机

在消费者端,通过 @RabbitListener 注解自动声明队列并绑定到交换机

@Componentpublic class SpringRabbitListener {//基于注解来声明队列和交换机,并且绑定@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = \"direct.queue1\", durable = \"true\"),//设置队列,并且持久化exchange = @Exchange(value = \"xiri.direct\",type = ExchangeTypes.DIRECT),//设置交换机和类型key = {\"key1\"}//设置路由)})public void listener1(String mes) throws InterruptedException {System.out.println(\"消费者1接受消息:\"+mes);}}

消息转换器

转换成json格式传输
消费者和生产者都需要创建bean

创建Bean

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConverter {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}

这样数据就是以JSON格式传输的
获取消息
RabbitMQ简述

消息丢失问题

生产者发送消息丢失

生产者发送消息到 RabbitMQ 服务器时,由于网络问题或 RabbitMQ 服务崩溃,消息未到达交换机

解决方案

1. 生产者重试机制

通过配置重试机制,但是SpringAMQP是阻塞的,如果对性能有要求不能使用,这个只是对连接进行的重试,而不是消息失败的重试

spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 virtual-host: /xiri #虚拟主机 username: xiri #账号 password: 123 #密码 connection-timeout: 1s #超时时间 template: retry: enabled: true #开启超时重试机制 initial-interval: 1000ms #失败后初始等待时间 multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier max-attempts: 3 #最大重试次数

测试效果,故意将网络故障,造成超时重试3次
RabbitMQ简述

2. 生产者确认

RabbitMQ 提供 ConfirmCallback 机制,确认消息是否成功到达交换机。
如果对消息可靠性要求不高,不需要开启确认机制,因为会影响性能
生产者yml文件配置

spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 virtual-host: /xiri #虚拟主机 username: xiri #账号 password: 123 #密码 connection-timeout: 1s #超时时间 template: retry: enabled: true #开启超时重试机制 initial-interval: 1000ms #失败后初始等待时间 multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier max-attempts: 3 #最大重试次数 publisher-confirm-type: correlated # 开启异步确认 publisher-returns: true # 开启路由失败回调

以下为以上内容中关键的配置信息

publisher-confirm-type: correlated # 开启异步确认publisher-returns: true # 开启路由失败回调

生产者代码配置

// Spring AMQP 配置@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true); // 开启强制回调// 设置 ConfirmCallbackrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println(\"消息到达交换机,ID: \" + correlationData.getId());} else {System.err.println(\"消息未到达交换机,原因: \" + cause);}});// 设置 ReturnsCallbackrabbitTemplate.setReturnsCallback(returned -> {System.err.println(\"消息未路由到队列: \" + returned.getMessage());});return rabbitTemplate;}

生产者测试

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.direct\";//设置消息唯一编号CorrelationData id = new CorrelationData(UUID.randomUUID().toString());//发送消息rabbitTemplate.convertAndSend(exchange,\"key\",\"消息\",id);}}

消息结果
RabbitMQ简述

3. 数据持久化

RabbitMQ默认将数据保存在内存当中,如果宕机了,消息就会丢失,还会造成内存积压,引发阻塞问题
实现数据持久化三个方面:交换机持久化、队列持久化、消息持久化
spring发送消息默认就是持久的
设置非持久化

@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent() {//队列名称String exchange = \"xiri.direct\";//设置消息唯一编号CorrelationData id = new CorrelationData(UUID.randomUUID().toString());//发送消息rabbitTemplate.convertAndSend(exchange, \"key\", \"消息\", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//发送持久化消息return message;}});}}

以下已经设置为1,表示非持久化模式
RabbitMQ简述

4. lazy queue

Lazy Queue 是 RabbitMQ 的一种特殊队列模式,它会尽可能将消息存储在磁盘,而不是内存中,从而减少内存使用,适合处理大量消息且消费较慢的场景
3.6.0(初始引入)
首次支持 Lazy Queue,允许消息直接存储到磁盘,减少内存占用。
3.12.0(默认模式)
从该版本开始,Lazy Queue 成为所有队列的默认模式,官方推荐升级到该版本或手动启用 Lazy 模式1。

消费者消息丢失问题

解决方案

1.确认机制

SpringAMQP消息确认机制有三种处理方式:

  1. none 不处理
  2. manual 手动处理,需要在业务代码中调用api
  3. auto 自动处理,利用aop处理,对代码没有破坏性
    当业务出现异常时,会自动返回nack
    如果是消息处理或校验异常,自动返回reject

开启消费者确认机制为auto,有spring确认消息处理完成后返回ack,异常返回nack

spring: rabbitmq: listener: simple: acknowledge-mode: auto #none:关闭ack,manual:手动ack,auto:自动ack
2.重试机制

在 Spring AMQP 的 RabbitMQ 配置中,stateless 是消费者重试机制(retry)的一个参数,用于控制重试时的状态管理方式
stateless=true(默认)

  1. 每次重试都是无状态的,即不保留前一次尝试的上下文(如数据库事务、Spring Session 等)。
  2. 适用场景:普通消息处理,无需依赖前一次重试的状态。
  3. 性能更好:因为不需要维护状态。

stateless=false

  1. 重试时会保留状态(如事务、Session 等),确保多次重试在同一个上下文中执行。
  2. 适用场景:需要事务一致性的操作(如支付处理)。
  3. 性能较低:因为需要维护状态。

开启重试机制

spring: rabbitmq: listener: simple: prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息 retry: enabled: true #开启消费者重试机制 initial-interval: 1000ms #失败后初始等待时间 multiplier: 1 #失败后下次等待时长倍数=initial-interval*multiplier max-attempts: 3 #最大重试次数 stateless: true #true为无状态,false为有状态。决定重试时是否保持消费者状态(如事务、Session等)

重试多次依然失败处理策略
在开启重试模式后,重试次数耗尽依然失败,则需要有MessageRecoverer接口来处理,它有三种实现:

实现类 行为 适用场景 RejectAndDontRequeueRecoverer(默认) 直接拒绝消息(reject),且不重新入队,消息可能丢失或进入死信队列(若配置)13 非关键消息,允许丢弃 ImmediateRequeueMessageRecoverer 立即将消息重新放回队列(nack + requeue=true),可能导致无限循环 临时性错误(如网络抖动) RepublishMessageRecoverer(推荐) 将消息重新发布到指定的异常交换机和队列,供人工或后续处理 关键业务,需

使用第三种方式演示
开启消费者失败重试机制,并设置MessageRecoverer,多次重试无效后将消息投递到异常交换机,交由人工处理问题
消费者ymy配置

spring: rabbitmq: listener: simple: retry: enabled: true #开启消费者重试机制

消费者配置

import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.retry.MessageRecoverer;import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration//这个配置需要开启重试机制才会开启@ConditionalOnProperty(prefix = \"spring.rabbitmq.listener.simple.retry\",name=\"enabled\",havingValue = \"true\")public class ErrorConfig{@Beanpublic DirectExchange errorExchange(){return new DirectExchange(\"error.direct\");}@Beanpublic Queue errorQueue(){return new Queue(\"error.queue\");}@Beanpublic Binding errorBinding(DirectExchange errorExchange,Queue errorQueue){//队列绑定交换机return BindingBuilder.bind(errorQueue).to(errorExchange).with(\"error\");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,\"error.direct\",\"error\");}}

效果
将异常信息和消息全部转到了error.queue
RabbitMQ简述

业务层幂等设计

  1. 数据库唯一约束
  • 例如订单表对 order_id 设置唯一索引,重复插入会报错。
    Redis 原子操作
  • 用 SETNX 或分布式锁标记已处理的消息。
  1. 消息去重
  • 生产者生成唯一 ID,发送消息时携带 correlationId,消费者记录已处理的 ID。
  • 消费者记录消息 ID,用 Redis 或数据库存储已处理的消息 ID。

延迟消息

1.死信交换机

利用死信队列(DLX)+ TTL 实现延迟消息
死信队列(DLX):死信会被路由到指定的死信交换机(DLX),再进入死信队列,由消费者处理
消息设置 TTL(Time To Live):消息或队列可以设置过期时间(TTL),到期后消息会变成“死信”

RabbitMQ简述

消费者

声明队列

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DlxConfig {@Beanpublic DirectExchange xiriExchange(){return new DirectExchange(\"xiri.direct\",true,false);}@Beanpublic Queue xiriQueue(){return QueueBuilder.durable(\"xiri.queue\").withArgument(\"x-dead-letter-exchange\", \"dlx.direct\").withArgument(\"x-dead-letter-routing-key\",\"dlx.key\").build();}@Beanpublic Binding xiriBinding() {return BindingBuilder.bind(xiriQueue()).to(xiriExchange()).with(\"xiri.key\");}}

消费

import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Componentpublic class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = \"dlx.queue\", durable = \"true\"),exchange = @Exchange(name = \"dlx.direct\", type = ExchangeTypes.DIRECT),key = \"dlx.key\" // 死信路由键))public void listener(String mes) throws InterruptedException {System.out.println(LocalDateTime.now() +\" 死信接受消息:\"+mes);}}
生产者
import org.junit.jupiter.api.Test;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.direct\";//发送消息rabbitTemplate.convertAndSend(exchange, \"xiri.key\", \"消息\", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间(5秒)message.getMessageProperties().setExpiration(\"5000\");return message;}});System.out.println(LocalDateTime.now() +\" 发送消息\");}}
结果

RabbitMQ简述
5秒后收到消息
RabbitMQ简述

缺点: 消息排序问题:如果队列中有不同 TTL 的消息,RabbitMQ 只会检查队头消息的 TTL,可能导致后进队的消息先过期

2.RabbitMQ延迟插件

使用 rabbitmq-delayed-message-exchange 插件
RabbitMQ 官方提供的插件,通过 自定义交换机类型(x-delayed-message) 实现真正的延迟投递,消息按延迟时间排序,到期后才会被路由到目标队列

  1. 下载插件(需匹配 RabbitMQ 版本):
    插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  2. 将下载的文件放到RabbitMQ的plugins目录里面
  3. 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 重启RabbitMQ

关闭

rabbitmq-service.bat stop

启动

rabbitmq-server start

消费者

import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Componentpublic class SpringRabbitListener {//延迟队列,关键点在交换机设置delayed属性为true@RabbitListener(bindings = @QueueBinding(value = @Queue(name = \"xiri.queue\", durable = \"true\"),exchange = @Exchange(name = \"xiri.direct\", type = ExchangeTypes.DIRECT,delayed = \"true\"),key = \"xiri.key\"))public void listener1(String mes) throws InterruptedException {System.out.println(LocalDateTime.now()+\" 消费者接受消息:\"+mes);}}

生产者

import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;@SpringBootTestpublic class ProducerTest {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid sent(){//队列名称String exchange = \"xiri.direct\";//发送消息rabbitTemplate.convertAndSend(exchange,\"xiri.key\",\"消息\",message -> {message.getMessageProperties().setDelayLong(5000L);//设置5秒过去return message;});System.out.println(LocalDateTime.now()+\" 发送消息\");}}

结果

RabbitMQ简述
RabbitMQ简述