> 文档中心 > MQRabbitMQ死信队列

MQRabbitMQ死信队列

这里写自定义目录标题

  • 什么是死信队列
    • 死信队列产生的三个条件
      • 消息TTL过期
      • 队列达到最大长度
      • 队列拒绝接收消息

什么是死信队列

当生产者通过交换机将消息传到消费者的队列中,消费者的队列由于种种原因,不能正常接收消息,这个时候就需要死信队列来处理

死信队列产生的三个条件

消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

消息TTL过期

说明:设置TTL过期时间可以在生产者处声明也可以在消费者处声明
在生产者中声明比较常用,可以根据信息的种类,设置不同的TTL时间

 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();     channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes());

在消费者处声明

 Map<String, Object> params = new HashMap<String, Object>();// 正常队列设置 params.put("x-dead-letter-exchange", deadExchangeName);// 消息传送到死信交换机还得告诉他应该发送给谁,就是rottingkey params.put("x-dead-letter-routing-key", "lisi"); params.put("x-expires", 10000);//主要作用是当正常队列不能接收消息后,将消息发送到死信交换机 String normalQueue = "normalQueue1"; channel.queueDeclare(normalQueue, false, false, false, params);

队列达到最大长度

 Map<String, Object> params = new HashMap<String, Object>(); params.put("x-key-letter-exchange", deadlExchange); params.put("x-key-letter-exchange", "lisi");// 还能设置交换时间 params.put("x-max-length", 5); channel.queueDeclare(normalQueue, false, false, false, params);

队列拒绝接收消息

      DeliverCallback deliverCallback = new DeliverCallback() {     public void handle(String s, Delivery delivery) throws IOException {  System.out.println(new String(delivery.getBody(), "utf-8"));//  第二个参数为mulitply  true表示向生产者发送所有(可能会丢失数据,应答了一个,其他没接收到会发生数据丢失,false表示接收一个应答一个channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//  false表示拒绝入队  channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);     } }; CancelCallback cancelCallback = new CancelCallback() {     public void handle(String s) throws IOException {     } };//取消自动应答 boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, cancelCallback);

实例代码

package xiang.test10死信队列;import com.rabbitmq.client.*;import xiang.utils.ConnectionUtils;import java.io.IOException;import java.util.HashMap;import java.util.Map;public class consumer01 {    public static final String normalExchangeName = "normalExchangeName";    public static final String deadExchangeName = "dead";    public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT); //声明死信队列 final String deadQueue = "deadQueue1"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定交换机 channel.queueBind(deadQueue, deadExchangeName, "lisi");// 正常队列绑定死信队列 Map<String, Object> params = new HashMap<String, Object>();// 正常队列设置 params.put("x-dead-letter-exchange", deadExchangeName); params.put("x-dead-letter-routing-key", "lisi"); params.put("x-expires", 10000);// String normalQueue = "normalQueue1"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, normalExchangeName, "zhangsan"); DeliverCallback deliverCallback = new DeliverCallback() {     public void handle(String s, Delivery delivery) throws IOException {  System.out.println(new String(delivery.getBody(), "utf-8"));     } }; CancelCallback cancelCallback = new CancelCallback() {     public void handle(java.lang.String s) throws IOException {     } }; channel.basicConsume(normalQueue, true, deliverCallback, cancelCallback);    }}
package xiang.test10死信队列;import com.rabbitmq.client.*;import xiang.utils.ConnectionUtils;import java.io.IOException;public class consumer02 {    public static final String deadExchangeName = "dead";    public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT); final String deadQueue = "deadQueue1"; channel.queueBind(deadQueue, deadExchangeName, "lisi"); DeliverCallback deliverCallback = new DeliverCallback() {     public void handle(String s, Delivery delivery) throws IOException {  System.out.println(new String(delivery.getBody(), "utf-8"));     } }; CancelCallback cancelCallback = new CancelCallback() {     public void handle(java.lang.String s) throws IOException {     } }; channel.basicConsume(deadQueue, true, deliverCallback, cancelCallback);    }}
package xiang.test10死信队列;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import sun.reflect.generics.tree.BaseType;import xiang.utils.ConnectionUtils;import java.io.IOException;public class producer {    public static final String normalExchangeName = "normalExchangeName";    public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT);// 设置TTL时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) {     String message = "info" + i;     channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes()); }    }}