RabbitMQ 进阶
文章目录
- 一、发送者的可靠性
- 二、MQ 的可靠性
-
- 2.1 数据持久化:
-
- 2.1.1 交换机持久化:
- 2.1.2 队列持久化:
- 2.1.3 消息持久化:
- 2.2 LazyQueue:
-
- 2.2.1 控制台配置 Lazy 模式:
- 2.2.2 代码配置 Lazy 模式:
- 三、消费者可靠性
-
- 3.1 消费者确认机制:
- 3.2 失败重试机制:
- 3.3 失败处理策略:
- 3.4 业务幂等性:
-
- 3.4.1 唯一消息 ID:
- 3.4.2 业务判断:
- 3.5 兜底方案:
- 四、延迟消息
-
- 4.1 死信交换机和延迟消息:
-
- 4.1.1 死信交换机:
- 4.1.2 延迟消息:
- 4.2 DelayExchange 插件:
-
- 4.2.1 基于注解方式声明延迟交换机:
- 4.2.2 基于 @Bean 的方式:
- 4.2.3 发送延迟消息:
我们可以通过 MQ 异步调用,来使程序的性能更好和解耦合。但是如果 MQ 的消息没有成功的被对应的程序处理,那么这样不就会造成数据不一致的情况。因此,我们这里必须要尽可能的确保 MQ 消息的可靠性,即:消息应该至少被消费者处理一次。
那么问题来了:
- 我们该如何确保 MQ 消息的可靠性?
- 如果真的发送失败,有没有其它的兜底方案?
相信本篇博客能给你答案。
一、发送者的可靠性
首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
- 发送消息时丢失:
- 生产者发送消息时连接 MQ 失败。
- 生产者发送消息到达 MQ 后未找到
Exchange
- 生产者发送消息到达 MQ 的
Exchange
后,未找到合适的Queue
- MQ 导致消息丢失:
- 消息到达 MQ,保存到队列后,尚未消费就突然宕机
- 消费者处理消息时:
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证 MQ 的可靠性,就必须从 3 个方面入手:
- 确保生产者一定把消息发送到 MQ
- 确保 MQ 不会将消息弄丢
- 确保消费者一定要成功处理消息
我们先来看如何确保生产者一定能把消息发送到 MQ。
1.1 生产者重试机制:
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与 MQ 的连接中断。
为了解决这个问题,SpringAMQP 提供的消息发送时的重试机制。即:当RabbitTemplate
与 MQ 连接超时后,多次重试。
修改publisher
模块的application.yaml
文件,添加下面的内容:
spring: rabbitmq: connection-timeout: 1s # 设置MQ的连接超时时间 template: retry: enabled: true # 开启超时重试机制 initial-interval: 1000ms # 失败后的初始等待时间 multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier max-attempts: 3 # 最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
如果是断网的情况下,整个业务都会被影响,我们可以很容易的发现问题所在,并进行解决,不过断网一般是不太会出现。
1.2 生产者确认机制:
一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到 MQ 之后丢失的现象,比如:
- MQ 内部处理消息的进程发生了异常
- 生产者发送消息到达 MQ 后未找到
Exchange
- 生产者发送消息到达 MQ 的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ 提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执。
具体如图所示:
总结如下:
- 当消息投递到 MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回 ack 的确认信息(因为这里是程序员自己代码写错了,如果返回 NACK 后面生产者会继续投递该消息,但是代码错误,怎么投都不会成功),代表投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
- 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
- 其它情况都会返回 NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
两个机制一般配合是配合使用。
1.2.1 开启生产者确认:
在 publisher 模块的application.yaml
中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
none
:关闭 confirm 机制simple
:同步阻塞等待 MQ 的回执correlated
:MQ 异步回调返回回执
我们一般推荐使用 correlated。
1.2.2 定义 ReturnCallback:
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。
内容如下:
package com.itheima.publisher.config;import lombok.AllArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j@AllArgsConstructor@Configurationpublic class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error