详解RabbitMQ高级特性之延迟队列_ttl+死信队列
目录
延迟队列
应用场景
TTL+死信队列 实现延迟队列
添加配置
常量类
声明队列和交换机并绑定二者关系
编写生产消息代码
编写消费消息代码
观察效果
修改生产消息代码
观察效果
延迟队列
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.
应用场景
延迟队列的使⽤场景有很多, ⽐如:
1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等
RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽
是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.
TTL+死信队列 实现延迟队列
添加配置
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://study:study@47.98.109.138:5672/extension
常量类
public class Constants { //死信 public static final String NORMAL_QUEUE = \"normal.queue\"; public static final String NORMAL_EXCHANGE = \"normal.exchange\"; public static final String DL_QUEUE = \"dl.queue\"; public static final String DL_EXCHANGE= \"dl.exchange\";}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import rabbitextensionsdemo.constant.Constants;@Configurationpublic class DLConfig { //正常的交换机和队列 @Bean(\"normalQueue\") public Queue normalQueue(){ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE) .deadLetterRoutingKey(\"dlx\") .build(); } @Bean(\"normalExchange\") public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } @Bean(\"normalBinding\") public Binding normalBinding(@Qualifier(\"normalQueue\") Queue queue, @Qualifier(\"normalExchange\") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(\"normal\").noargs(); } //死信交换机和队列 @Bean(\"dlQueue\") public Queue dlQueue(){ return QueueBuilder.durable(Constants.DL_QUEUE).build(); } @Bean(\"dlExchange\") public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } @Bean(\"dlBinding\") public Binding dlBinding(@Qualifier(\"dlQueue\") Queue queue, @Qualifier(\"dlExchange\") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(\"dlx\").noargs(); }}
编写生产消息代码
@RequestMapping(\"/delay\") public String delay() { System.out.println(\"delay...\"); rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, \"normal\", \"ttl test 10s...\", message -> { message.getMessageProperties().setExpiration(\"10000\"); //单位: 毫秒, 过期时间为10s return message; }); rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, \"normal\", \"ttl test 30s...\", message -> { message.getMessageProperties().setExpiration(\"30000\"); //单位: 毫秒, 过期时间为30s return message; }); System.out.printf(\"%tc 消息发送成功 \\n\", new Date()); return \"消息发送成功\"; }
编写消费消息代码
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import rabbitextensionsdemo.constant.Constants;import java.util.Date;@Componentpublic class DLListener { @RabbitListener(queues = Constants.DL_QUEUE) public void dlHandMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf(\"[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \\n\", new Date(), new String(message.getBody(),\"UTF-8\"), message.getMessageProperties().getDeliveryTag()); }}
观察效果
此时我们可以看到,基于TTL+死信队列实现出来了 延迟队列 的效果,但是这样就没问题了吗?
修改生产消息代码
@RequestMapping(\"/delay\") public String delay() { System.out.println(\"delay...\"); rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, \"normal\", \"ttl test 30s...\", message -> { message.getMessageProperties().setExpiration(\"30000\"); //单位: 毫秒, 过期时间为30s return message; }); rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, \"normal\", \"ttl test 10s...\", message -> { message.getMessageProperties().setExpiration(\"10000\"); //单位: 毫秒, 过期时间为10s return message; }); System.out.printf(\"%tc 消息发送成功 \\n\", new Date()); return \"消息发送成功\"; }
观察效果
此时我们看到,设置TTL为10秒的消息居然在30秒后才进入死信队列,这是为什么呢?
是因为RabbitMQ检查消息的TTL是在消息发送给消费方的时候进行检测的,而什么时候发送给发送方又根据队头消息的TTL,所以这就是问题所在,也是TTL+死信队列实现延迟队列所存在的问题。