RabbitMQ:延时消息(死信交换机、延迟消息插件)_rabbitmq延时插件
目录
延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延时任务:设置一定时间之后才执行的任务。
一、死信交换机【不推荐】
当一个队列的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false。
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机也称之为死信交换机
。
具体实现流程如下:
- 首先创建两个队列
direct.queue、dlx.queue
,需要注意的是在创建direct.queue
队列时,需要绑定死信交换机。
如何绑定死信交换机:选中Dead letter exchange
输入交换机的名称
2. 创建两个交换机分别绑定两个队列mt.direct、mt.dlx.direct
3. 消费者监听死信队列,并给mt.direct
发送定时消息
@Testpublic void dlxExchangeTest(){ String exchangeName = \"mt.direct\"; String message = \"黄色警报 ......\"; rabbitTemplate.convertAndSend(exchangeName, \"dlx\", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(\"1000\"); // 设置过期时间,单位ms,1000=1s return message; } });}
@RabbitListener(queues = \"dlx.queue\")public void listenDlxQueue(String message){ System.out.println(String.format(\"消费者收到了dlx.queue: %s\", message));}
二、延迟消息插件【推荐】
要想使用延迟消息,需要先安装延迟消息插件rabbitmq_delayed_message_exchange,根据自己RabbitMQ的版本去下载。
2.1 安装插件【Linux】
去官网下载插件。
将插件放入RabbitMQ的plugins
中,具体路径如下:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.13.7/plugins
安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ服务
systemctl restart rabbitmq-server
再次登录rabbitmq,如果exchange的类型中出现:x-delayed-message,说明该插件安装成功!
2.2 安装插件【Windows】
将插件放入RabbitMQ的plugins
中,具体路径如下:
xxx\\RabbitMQ Server\\rabbitmq_server-3.12.10\\plugins
然后进入到RabbitMQ额度sbin
目录下,执行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.3 如何使用
rabbitmq_delayed_message_exchange
插件的实现原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一段时间,到期后在投递到队列。
Java注解的实现方式
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"delay.queue\", durable = \"true\"), exchange = @Exchange(name = \"mt.delay.direct\", delayed = \"true\"), key = \"delay\"))public void listenDelayQueue(String message){ System.out.println(String.format(\"消费者收到了delay.queue: %s\", message));}
Java Bean的实现方式
@Beanpublic DirectExchange delayExchange() { return ExchangeBuilder.directExchange(\"mt.delay.direct\").durable(true).delayed().build();}
消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = \"delay.queue\", durable = \"true\"), exchange = @Exchange(name = \"mt.delay.direct\", delayed = \"true\"), key = \"delay\"))public void listenDelayQueue(String message){ System.out.println(String.format(\"消费者收到了delay.queue: %s\", message));}
生产者
@Testpublic void delayExchangeTest(){ String exchangeName = \"mt.delay.direct\"; String message = \"延迟警报 ......\"; rabbitTemplate.convertAndSend(exchangeName, \"delay\", message, new DelayMessageProcessor(5000));}
package com.ming.processor;import lombok.RequiredArgsConstructor;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * @RequiredArgsConstructor 是Lombok库提供的一个注解,用于自动生成包含必需参数的构造函数。必需参数是指那些被声明为 final 或者有 @NonNull 注解的成员变量。 */@RequiredArgsConstructorpublic class DelayMessageProcessor implements MessagePostProcessor { /** * 定义延迟时间 */ private final int delay; @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delay); return message; }}