RabbitMQ03——面试题
目录
一、mq的作用和使用场景
二、mq的优点
三、mq的缺点
四、mq相关产品,每种产品的特点
4.1开源产品
4.2 商业/云产品
五、rabbitmq的搭建过程
六、rabbitmq相关角色
七、rabbitmq内部组件
八、生产者发送消息的过程?
九、消费者接收消息过程?
十、springboot项目中如何使用mq?
十一、如何保障消息不丢失?
如何创建死信队列
RabbitMQ规定消息符合以下某种情况时,将会成为死信
十三、延迟队列简介
ttl+死信队列
延迟插件
十四、RabbitMQ消息重复消费
十五、RabbitMQ消息积压
十六、消息入库(消息补偿)
一、mq的作用和使用场景
作用:
RabbitMQ 是一个开源的消息代理和队列服务器,主要用于实现应用程序之间的异步通信和解耦。它的主要作用包括:
-
应用解耦:将相互依赖的应用系统分离,降低系统间的耦合度
-
异步通信:实现系统间的非实时、异步消息传递
-
流量削峰:缓冲突发流量,避免系统被压垮
-
消息分发:支持多种消息路由模式(点对点、发布/订阅等)
-
可靠传递:提供消息持久化、确认机制等保证消息可靠传输
使用场景:
1、抢购活动,削峰填谷,防止系统崩塌。
2、延迟信息处理,比如 10 分钟之后给下单未付款的用户发送邮件提醒。
3、解耦系统,对于新增的功能可以单独写模块扩展,比如用户确认评价之后,新增了给用户返积分的功能,这个时候不用在业务代码里添加新增积分的功能,只需要把新增积分的接口订阅确认评价的消息队列即可,后面再添加任何功能只需要订阅对应的消息队列即可。
二、mq的优点
消息队列(MQ,Message Queue)是分布式系统中重要的中间件,具有很多优点,能够有效提升系统的解耦、可扩展性、可靠性和性能。以下是消息队列的主要优点:
1. 解耦系统模块
-
生产者与消费者解耦:发送消息的一方(生产者)不需要知道接收消息的一方(消费者)是否存在,也不需要等待其处理完成。
-
提升系统可维护性:模块之间通过消息通信,降低直接依赖,便于独立开发、测试和维护。
2. 异步处理,提升性能
-
异步非阻塞通信:将耗时操作通过消息队列异步处理,避免同步调用阻塞主线程,显著提高系统响应速度和吞吐量。
-
适用于高并发场景:如秒杀、订单处理、日志收集等,可以削峰填谷,缓解系统压力。
3. 流量削峰填谷
-
在高并发场景下,消息队列可以作为缓冲区,将突发的大量请求暂存起来,消费者按自己的处理能力逐步消费,避免系统被压垮。
4. 保障消息可靠传递
-
多数消息队列支持消息持久化、确认机制(Ack)、重试机制等,确保消息不丢失、不重复。
-
支持消息顺序性、事务消息等高级特性,满足不同业务场景需求。
5. 支持广播/发布订阅模式
-
消息可以被多个消费者消费,适用于通知、广播、事件驱动等场景。
6. 实现最终一致性
-
在分布式系统中,通过消息队列可以实现跨服务的数据同步,保证多个服务之间的数据最终一致性。
7. 易于扩展
-
消息队列支持横向扩展,可以通过增加消费者来提升处理能力,适用于大数据量、高并发的系统。
8. 丰富的生态支持
-
常见的 MQ 中间件如 RabbitMQ、Kafka、RocketMQ、ActiveMQ 等,都有成熟的生态和社区支持,适配各种业务场景。
📌 总结一句话:
消息队列通过解耦、异步、缓存和可靠传递等机制,提升了系统的可扩展性、稳定性和响应能力,是构建高并发、分布式系统的重要组件。
三、mq的缺点
消息队列(MQ)作为分布式系统中实现异步通信、削峰填谷、解耦的重要组件,虽然带来了诸多优势,但也存在一些缺点和挑战。以下是使用 MQ(如 RabbitMQ、Kafka、RocketMQ 等)时可能遇到的主要问题:
1. 系统复杂度增加
-
引入新的组件:使用 MQ 后,系统架构中新增了一个中间件,需要考虑其部署、配置、监控、容错等问题。
-
需要处理消息顺序性、幂等性、重复消费等复杂问题,增加了开发和维护成本。
2. 消息丢失风险
-
如果未正确配置持久化、确认机制等,消息可能在以下环节丢失:
-
生产端发送失败;
-
Broker 存储失败;
-
消费端处理失败且未正确 Ack。
-
-
需要额外机制(如 Confirm、Ack、持久化)来保障可靠性。
3. 消息重复消费
-
网络异常、Ack 超时、消费者宕机等情况可能导致消息被重复投递。
-
消费者必须实现幂等性处理逻辑,避免重复消费造成数据错误。
4. 消息顺序性难以保证
-
多队列、多分区、多线程消费时,消息的顺序性难以保证。
-
对于需要严格顺序的业务(如订单状态变更),需要额外处理逻辑。
5. 系统延迟增加
-
异步处理虽然提高了吞吐量,但也引入了额外的网络传输和队列排队时间。
-
对实时性要求高的系统(如金融交易),可能不适合使用 MQ。
6. 运维成本高
-
需要对 MQ 集群进行监控、扩容、备份、故障转移等操作。
-
不同 MQ 的运维方式不同,学习成本高。
7. 资源消耗
-
消息堆积时,会占用大量内存和磁盘资源。
-
消息压缩、持久化、复制等操作也会带来一定的 CPU 和 I/O 消耗。
8. 消息堆积问题
-
当消费速度跟不上生产速度时,会导致消息堆积,影响系统性能。
-
需要合理设置消费者的并发数、限流策略或自动扩容机制。
9. 一致性问题
-
使用 MQ 后,生产端和消费端的数据一致性不再是本地事务,需要引入事务消息、补偿机制、分布式事务等方案,增加了系统复杂性。
10. 安全性和权限管理
-
需要对消息队列进行访问控制、权限管理、加密传输等,防止数据泄露或非法访问。
总结:
缺点类型
说明
复杂度增加
架构更复杂,需额外维护 MQ 组件
消息丢失
需配置确认机制、持久化等
消息重复
消费端需实现幂等
顺序性问题
多线程/多分区下难保证顺序
延迟增加
异步带来额外延迟
运维成本
需监控、扩容、备份等
资源消耗
内存、磁盘、CPU 占用
消息堆积
消费能力不足导致堆积
数据一致性
需要事务或补偿机制
安全问题
需权限控制、加密等
建议:
在使用 MQ 之前,应根据实际业务场景权衡利弊。对于高可用、高吞吐、低实时性要求的系统,MQ 是非常合适的选择;而对于强一致性、低延迟、简单业务的系统,可能不需要引入 MQ。
四、mq相关产品,每种产品的特点
4.1开源产品
1. RabbitMQ
2. Apache Kafka
3. Apache RocketMQ
4. Apache Pulsar
5. ActiveMQ
4.2 商业/云产品
1. AWS SQS/SNS
2. Azure Service Bus
3. Google Cloud Pub/Sub
4. Alibaba Cloud MQ
-
需要复杂路由 → RabbitMQ
-
超高吞吐日志 → Kafka
-
金融交易场景 → RocketMQ
-
云原生/多租户 → Pulsar
-
全托管服务 → AWS SQS/Azure Service Bus
-
传统Java EE → ActiveMQ
五、rabbitmq的搭建过程
docker下安装配置rabbitmq
1、拉取镜像
docker pull rabbitmq:latestdocker pull rabbitmq:3.9.0-management
2、创建并启动容器
使用 Docker 运行一个名为 rabbitmq
的容器,基于 rabbitmq:3.9.0-management
镜像,映射 5672(AMQP协议端口) 和 15672(管理界面端口) 到宿主机,并将容器内的 /etc/rabbitmq
配置目录挂载到宿主机的 /etc/rabbitmq
,以后台模式(-d
)启动,同时分配一个交互式终端(-i
)以保持容器运行。
docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq rabbitmq:3.9.0-management
3、查看容器状态
docker ps
如果容器状态显示为Up,并且端口映射正确,那么RabbitMQ服务已经成功启动。
4、查看容器日志
docker logs rabbitmq
访问RabbitMQ管理界面
5、创建用户赋权
-
创建用户:添加一个名为
admin
、密码为123456
的 RabbitMQ 用户; -
赋予角色:将
admin
用户标记为administrator
(超级管理员),拥有所有管理权限; -
设置权限:在默认虚拟主机
/
中,授予admin
用户对所有资源(交换机、队列、绑定)的 配置、写入、读取 全部权限(\".*\"
通配符表示全部)。
# 进入容器docker exec -it rabbitmq bash #启用 RabbitMQ 的管理控制台插件rabbitmq-plugins enable rabbitmq_management# 在容器内执行rabbitmqctl add_user admin 123456rabbitmqctl set_user_tags admin administratorrabbitmqctl set_permissions -p / admin \".*\" \".*\" \".*\"
6、使用浏览器打开RabbitMQ管理界面。
默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL验证rabbitmq搭建成功:
http://:15672
六、rabbitmq相关角色
RabbitMQ 中重要的角色有:生产者、消费者和代理:
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
direct
(直连)、topic
(主题)、fanout
(扇出)、headers
(头匹配)routing key
(路由键)或 headers
进行匹配七、rabbitmq内部组件
1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交换器):用于接受、分配消息。
4、Queue(队列):用于存储生产者的消息。
5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。
6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。
message_store
插件)rabbitmq-management
提供 Web UI)八、生产者发送消息的过程?
发送消息的核心步骤
-
建立连接
-
创建通道
-
声明交换机/队列(可选)
-
发布消息
-
处理确认(可选)
-
关闭连接
在 RabbitMQ 中,生产者发送消息的过程通常包括以下步骤:
1、首先,生产者与 RabbitMQ Broker 建立一个 TCP 连接(Connection),然后通过该连接创建一个信道(Channel);
2、接着,生产者通过信道将消息发送到指定的交换机(Exchange),并指定路由键(Routing Key);
3、RabbitMQ 接收到消息后,根据交换机的类型和绑定规则,将消息路由到一个或多个对应的队列(Queue)中;
4、如果开启了确认机制(如 publisher confirm),RabbitMQ 还会返回确认信息给生产者,确保消息已成功投递。整个过程通过 AMQP 协议完成,确保了消息的可靠传输。
九、消费者接收消息过程?
-
建立连接和通道
-
声明队列(可选)
-
设置QoS(服务质量控制)
-
注册消费者
-
消息处理
-
发送确认(ACK/NACK)
-
处理失败消息
在 RabbitMQ 中,消费者接收消息的过程如下:
1、消费者首先通过已建立的 TCP 连接(Connection)创建一个信道(Channel),然后通过该信道订阅(consume)一个或多个队列(Queue);
2、RabbitMQ 会将队列中的消息主动推送给消费者(push 模式),或者消费者主动从队列中拉取消息(pull 模式,如 basic.get);
3、消费者接收到消息后进行业务处理,处理完成后通常会发送一个确认(ack)回执给 RabbitMQ,告知该消息已被正确消费;
4、RabbitMQ 收到确认后,才会从队列中删除这条消息;如果未收到确认或消费者断开连接,RabbitMQ 可以将消息重新入队并转发给其他消费者处理,确保消息不会丢失。
5、整个过程可以通过设置 QoS(服务质量)来控制消费者的预取数量,实现流量控制和负载均衡。
十、springboot项目中如何使用mq?
上图的解说:
在Spring Boot项目中使用消息队列(MQ),如图所示,主要涉及生产者、消息中间件(以RabbitMQ为例)和消费者三个部分。
1. 生产者:在Spring Boot应用中,生产者通过配置与RabbitMQ Broker建立TCP连接(Connection),并通过该连接创建多个信道(Channel)。每个信道可以独立发送消息。生产者将消息发送到指定的交换机(Exchange),并附带路由键(Routing Key),用于指导消息如何被路由到正确的队列(Queue)。
2. 消息中间件(Broker - RabbitMQ Server):作为消息的中介,RabbitMQ接收来自生产者的消息,并根据交换机类型和绑定规则(Binding)将消息路由到一个或多个队列中。虚拟主机(Virtual Host)提供了隔离环境,确保不同应用之间的消息不会相互干扰。队列是存储消息的地方,直到它们被消费者消费。
3. 消费者:同样地,消费者也通过TCP连接与RabbitMQ Broker建立通信,并创建自己的信道来订阅感兴趣的队列。当队列中有新消息时,RabbitMQ会根据消费者的订阅情况,将消息推送给消费者进行处理。消费者处理完消息后,通常需要向RabbitMQ发送确认(ack),表示消息已被成功消费,这样RabbitMQ才会从队列中移除这条消息。
整个过程在Spring Boot中可以通过配置文件和注解简化实现,例如使用`@EnableRabbit`启用RabbitMQ支持,通过`@RabbitListener`监听特定队列的消息等。此外,还可以利用Spring Boot的自动配置功能,简化与RabbitMQ的集成工作,使得开发者能够更专注于业务逻辑的实现。
具体实现细节:(详情参考rabbitmq02)
3.1引入jar包
spring-boot-starter-amqp.jarspring-boot-starter-web.jar
3.2Springboot自动装配原理,进行application.yml配置
连接rabbitmq下相关信息
host
port
username
password
virtualHost
3.3使用rabbitmq模版工具类 rabbitTemplate amqpTemplate
send发送消息方法,需要指定交换机,路由key,交换机的名称
receive接收消息 注解controller控制层类上,添加注解可以实时接收消息@RabbitListener在注解里指定队列的名字实时监听队列信息
3.4细化,创建一个RabbitmqConfig配置类
项目启动时,自动创建交换机、创建队列、指定交换机和队列的绑定
十一、如何保障消息不丢失?
-
发送阶段:保障消息到达交换机
在消息从生产者发送到 RabbitMQ 的交换机时,需要确保消息能够成功到达。RabbitMQ 提供了两种机制来保障这一点:- 事务机制(Transaction):通过开启事务,生产者可以确保消息被 RabbitMQ 成功接收。如果事务提交失败,生产者可以进行重试。
- Confirm 确认机制:当生产者启用 Confirm 模式后,RabbitMQ 会在消息被成功接收并路由到一个或多个队列后,发送一个确认给生产者。如果消息未能成功路由,RabbitMQ 会发送一个否定确认(Nack)。生产者可以根据确认结果决定是否重发消息。(消息被拒绝接收怎么办?消息被拒绝首先重新发送给这个队列,其次发送给死信队列由监听死信队列的队列对消息进行处理)
-
存储阶段:持久化机制
为了防止 RabbitMQ 服务器宕机导致消息丢失,需要对消息进行持久化处理。具体包括以下三个方面的持久化:- 交换机持久化:在声明交换机时,可以将其设置为持久化(
durable=true
),这样即使 RabbitMQ 服务器重启,交换机的定义也不会丢失。 - 队列持久化:声明队列时,同样可以将其设置为持久化(
durable=true
),确保队列在服务器重启后仍然存在。 - 消息内容的持久化:在发布消息时,可以将消息标记为持久化(
delivery_mode=2
),这样消息会被写入磁盘,而不是仅保留在内存中。只有同时保证交换机、队列和消息都持久化,才能确保消息在服务器重启后不会丢失。
- 交换机持久化:在声明交换机时,可以将其设置为持久化(
-
消费阶段:消息的确认机制
在消费者端,需要确保消息被正确处理后才从队列中删除,避免因消费者处理失败而导致消息丢失。RabbitMQ 提供了以下两种确认机制:- 自动 ACK(Auto Ack):消费者在接收到消息后,会自动向 RabbitMQ 发送确认(ACK),表示消息已被成功处理。这种方式简单高效,但如果消费者在处理消息时发生异常,消息可能会丢失。
- 手动 ACK(Manual Ack):消费者需要在处理完消息后,显式地向 RabbitMQ 发送确认(ACK)。如果消息处理失败,消费者可以选择不发送 ACK,这样 RabbitMQ 会将消息重新入队,供其他消费者再次尝试处理。这种方式更加可靠,推荐在生产环境中使用。
手动ack和自动ack的区别:当使用自动ACK时,消息一旦被发送到消费者,消息队列服务会立即认为该消息已被成功处理,并从队列中移除这条消息。使用手动ACK时,消费者在收到消息后不会立即被视为已成功处理,而是需要在消息确实被成功处理之后,由消费者主动向消息队列服务发送确认指令,告知该消息可以安全地从队列中移除。
以下是具体代码实现:
接收方消息确认机制
自动ack|手动ack
spring: rabbitmq: host: 主机号 port: 5672 username: admin password: 123456 virtual-host: /yan3 listener: simple: acknowledge-mode: manual direct: acknowledge-mode: manual
package com.hl.rabbitmq01.web;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.io.IOException;@RestController@RequestMapping(\"/c\")public class ConsumerController { @RabbitListener(queues = {\"topicQueue01\"}) public void receive(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); System.out.println(msg); //业务逻辑 比如传入订单id,根据订单id,减少库存、支付等, // 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息 if(msg.length() >= 5){ //确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else{ //拒绝消息 not ack // 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } }}
消息的持久化机制
交换机的持久化
队列的持久化
消息内容的持久化
package com.hl.rabbitmq01.direct;import com.hl.rabbitmq01.util.MQUtil;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.MessageProperties;import java.io.IOException;import java.util.concurrent.TimeoutException;/*生产者 javaSE方式简单测试发布订阅-------direct模型生产者----消息队列----消费者 */public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接 Connection connection = MQUtil.getConnection(); //2、基于连接,创建信道 Channel channel = connection.createChannel(); //3、基于信道,创建队列 /* 参数: 1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建 2. durable:是否持久化,当mq重启之后,消息还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 4。当Connection关闭时,是否删除队列 autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ channel.queueDeclare(\"directQueue01\", true, false, false, null); channel.queueDeclare(\"directQueue02\", false, false, false, null); /* 声明交换机 参数1:交换机名称 参数2:交换机类型 */ channel.exchangeDeclare(\"directExchange01\", BuiltinExchangeType.DIRECT,true); /* 绑定交换机和队列 参数1:队列名 参数2:交换机名称 参数3:路由key 广播模型 不支持路由key \"\" */ channel.queueBind(\"directQueue01\",\"directExchange01\",\"error\"); channel.queueBind(\"directQueue02\",\"directExchange01\",\"error\"); channel.queueBind(\"directQueue02\",\"directExchange01\",\"info\"); channel.queueBind(\"directQueue02\",\"directExchange01\",\"trace\"); //发送消息到消息队列 /* 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\" 2. routingKey:路由名称,简单模式下路由名称使用消息队列名称 3. props:配置信息 4. body:发送消息数据 */ channel.basicPublish(\"directExchange01\",\"user\", MessageProperties.PERSISTENT_TEXT_PLAIN,(\"Hello World \").getBytes()); //4、关闭信道,断开连接 channel.close(); connection.close(); }}
package com.hl.rabbitmq01.web;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import java.io.IOException;import java.nio.charset.StandardCharsets;@RestController@RequestMapping(\"/p\")public class ProducerController { @Autowired private AmqpTemplate amqpTemplate; @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping(\"/send\") public void send(@RequestParam(defaultValue = \"user\") String key, @RequestParam(defaultValue = \"hello\") String msg) throws IOException { //amqpTemplate.convertAndSend(\"topicExchange\", key, msg);// rabbitTemplate.convertAndSend(\"topicExchange\",key,msg); Channel channel = rabbitTemplate .getConnectionFactory(). createConnection() .createChannel(false); //false 非事务模式运行 无需手动提交 channel.basicPublish( \"topicExchange\", key, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); }}/*创建交换机 */@Beanpublic TopicExchange topicExchange(){ return ExchangeBuilder .topicExchange(\"topicExchange\") .durable(true) //是否支持持久化机制 .build();}/*创建队列 */@Beanpublic Queue queue(){ return QueueBuilder.durable(\"topicQueue01\").build();}
发送方的消息确认机制
1、事务机制
消耗资源
RabbitMQ中与事务有关的主要有三个方法:
-
txSelect() 开始事务
-
txCommit() 提交事务
-
txRollback() 回滚事务
txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。
示例
@RestControllerpublic class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping(\"/send\") public String sendMessage(String message){ rabbitTemplate.setChannelTransacted(true); //开启事务操作 rabbitTemplate.execute(channel -> { try { channel.txSelect();//开启事务 channel.basicPublish(\"Fanout_Exchange\",\"\",null,message.getBytes()); int i = 5/0; channel.txCommit();//没有问题提交事务 }catch (Exception e){ e.printStackTrace(); channel.txRollback();//有问题回滚事务 } return null; }); return \"success\"; }}
消费者没有任何变化。
通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。
2、confirm确认机制
推荐
同步通知
channel.confirmSelect(); //开始confirm操作channel.basicPublish(\"Fanout_Exchange\",\"\",null,message.getBytes());if (channel.waitForConfirms()){ System.out.println(\"发送成功\");}else{ //进行消息重发 System.out.println(\"消息发送失败,进行消息重发\");}
异步通知
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() { //消息正确到达broker,就会发送一条ack消息 @Override public void handleAck(long l, boolean b) throws IOException { System.out.println(\"发送消息成功\"); } //RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息 @Override public void handleNack(long l, boolean b) throws IOException { System.out.println(\"发送消息失败,重新发送消息\"); }});channel.basicPublish(\"Fanout_Exchange\",\"\",null,message.getBytes());
十二、死信交换机和死信队列
在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。
死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。
死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机
如何创建死信队列
创建死信队列的核心在于:在原始队列上配置两个关键参数 x-dead-letter-exchange
(死信交换机)和可选的 x-dead-letter-routing-key
(死信路由键)。当消息在原始队列中因消费失败、超时或队列达到上限等原因变成死信后,RabbitMQ 会自动将这些消息通过指定的交换机转发到绑定的死信队列中,从而实现消息的集中处理与问题排查。
RabbitMQ规定消息符合以下某种情况时,将会成为死信
-
队列消息长度到达限制(队列消息个数限制);
-
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
-
原队列存在消息过期设置,消息到达超时时间未被消费;
死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。
Map map = new HashMap(); map.put(\"x-dead-letter-exchange\",\"deadExchange\");//当前队列和死信交换机绑定 map.put(\"x-dead-letter-routing-key\",\"user.#\");//当前队列和死信交换机绑定的路由规则// map.put(\"x-max-length\",2);//队列长度 map.put(\"x-message-ttl\",10000);//队列消息过期时间,时间ms// return QueueBuilder.durable(\"topicQueue01\").build(); return QueueBuilder.durable(\"topicQueue\").withArguments(map).build();
十三、延迟队列简介
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现
ttl+死信队列
基本原理:
-
TTL(Time-To-Live)
- TTL 表示消息的存活时间,单位为毫秒。当消息在队列中等待超过指定的 TTL 时间后,消息会被自动删除。
- 通过为队列或单个消息设置 TTL,可以控制消息在队列中的存活时间。
-
死信队列(DLX)
- 当消息因为某些原因(如 TTL 过期、队列达到最大长度、消费者拒绝消息等)无法被正常消费时,RabbitMQ 会将这些消息转发到一个指定的交换机(即死信交换机),然后由死信交换机将消息路由到死信队列中。
- 通过配置死信交换机和死信队列,可以实现消息的延迟处理。
ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。
延迟插件
人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
十四、RabbitMQ消息重复消费
RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客
业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。
比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。
解决方法一:send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认,然后每当有一条消息消费成功了,就把该消息的唯一ID记录在Redis 上,然后每次发送消息时,都先去 Redis 上查看是否有该消息的 ID,如果有,表示该消息已经消费过了,不再处理,否则再去处理。
解决方法二:利用数据库唯一约束实现幂等,insert if not exist 可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据。
解决方法三:sql的乐观锁 比如给用户发送短信,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,每修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
十五、RabbitMQ消息积压
RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客
消息积压产生的原因 正常而言,一般的消息从消息产生到消息消费需要经过以下几种阶段。
以Direct模式为例:
消息由生产者产生,比如新订单的创建等,经过交换机,将消息发送至指定的队列中,然后提供给对应的消费者进行消费。
在这个链路中,存在消息积压的原因大致分为以下几种:
1、消费者宕机,导致消息队列中的消息无法及时被消费,出现积压。 2、消费者没有宕机,但因为本身逻辑处理数据耗时,导致消费者消费能力不足,引起队列消息积压。 3、消息生产方单位时间内产生消息过多,比如“双11大促活动”,导致消费者处理不过来。
消息积压问题解决
1、大促活动等,导致生产者流量过大,引起积压问题。提前增加服务器的数量,增加消费者数目,提升消费者针对指定队列消息处理的效率。
2、上线更多的消费者,处理消息队列中的数据。(和1中的大致类似)
3、如果成本有限,则可以专门针对这个队列,编写一个另类的消费者。当前另类消费者,不进行复杂逻辑处理,只将消息从队列中取出,存放至数据库中,然后basicAck反馈给消息队列。
十六、消息入库(消息补偿)
如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一些极端情况。
在使用消息队列(Message Queue)时,消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时,补偿机制允许系统将消息重新发送或执行其他操作,以确保消息的可靠传递和处理。
补偿机制通常涉及以下几个方面:
-
重试机制:当消息处理失败时,补偿机制会尝试重新发送消息给消费者,以便重新处理。重试间隔和重试次数可以根据具体情况进行配置,以避免重复投递导致的消息处理失败。
-
延时队列:补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时,可以将该消息放入到延时队列中,在一定的延时之后再次尝试发送给消费者进行处理。
-
死信队列:当消息无法被成功处理时,可以将这些无法处理的消息发送到死信队列(Dead Letter Queue)。死信队列通常用于存储无法被消费者处理的消息,以便后续进行排查和处理。
-
可视化监控和报警:补偿机制还可以包括对消息队列的监控和报警功能,以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况,及时发现问题并采取相应的补救措施。
补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景,不同的消息队列系统可能提供不同的补偿机制。因此,在选择和使用消息队列时,需要根据自身的需求和系统特点来选择适合的消息补偿机制。