> 文档中心 > RabbitMQ手动应答机制-案例代码梳理

RabbitMQ手动应答机制-案例代码梳理

目录

  • 消息应答
    • 1、概念
    • 2、自动应答
    • 3、手动应答
    • 4、手动批量应答(Multiple)
    • 5、消息自动重新入队
    • 6、手动应答代码实现
  • 7、源码地址

消息应答

参考代码:https://gitee.com/lhzlx/rabbit-simple-demo.git

1、概念

消息应答
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,此时rabbitmq 可以把该消息删除了。

一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制。

2、自动应答

消费者接收消息后就自动应答为接收成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接关闭,那么消息就丢失了;另一方面这种模式没有对传递的消息数量进行限制,有可能使得消费者由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽;所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

3、手动应答

手动应答有三种方法:

  • Channel.basicAck(肯定确认) :RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(否定确认) :不确认消息
  • Channel.basicReject(拒绝确认) :不处理该消息了直接拒绝,可以将其丢弃了

4、手动批量应答(Multiple)

手动批量应答的好处是可以并减少网络拥堵

当在进行手动应答时,需要传入是否批量应答的参数:
在这里插入图片描述
multiple参数的具体含义:

  • true 代表批量应答 channel 上未应答的消息
    channel 上有传送 tag 的消息 5,6,7,8;如果当前tag 是 8,那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答。
  • false 代表不进行批量应答,只响应当前tag的消息
    只会应答 tag=8 的消息;而5,6,7 这三个消息依然不会被确认收到消息应答。
    在这里插入图片描述

5、消息自动重新入队

如果消费者由于某些原因失去连接,导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔宕机,也可以确保不会丢失任何消息。
在这里插入图片描述

6、手动应答代码实现

代码包路径:lhz.ack

public class Producer {    /**     * 设置队列名称     */    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel();) {     channel.queueDeclare(QUEUE_NAME, false, false, false, null);     // 从控制台当中接受信息     Scanner scanner = new Scanner(System.in);     while (scanner.hasNext()) {  String message = scanner.next();  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  System.out.println("发送消息完成:" + message);     } }    }}

消费者:
消费者需要两个,分别为:Consumer01、Consumer02两者代码一致,只是名称、沉睡时间不同,下面以Consumer01代码为例:

// Consumer01与Consumer02代码一致,只是沉睡时间不一样public class Consumer01 {    /**     * 设置队列名称     */    private final static String ACK_QUEUE_NAME = "ack_queue_name";    public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1 等待接收消息处理时间较长"); //消息消费的时候如何处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> {     String message = new String(delivery.getBody());     System.out.println("C1接收到消息:" + message); // 模拟处理时间     try {  // c1沉睡5S,c2沉睡1S  TimeUnit.SECONDS.sleep(5);     } catch (InterruptedException e) {  e.printStackTrace();     }     // 1.消息标记 tag     // 2.是否批量应答未应答消息     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);     System.out.println("C1消息处理完成,已确认"); }; //采用手动应答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {     System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); });    }}

目的:
为了演示在消费者01没有手动应答前并且消费者01出现了宕机,如果此时消息被消费者02进行了消费,则表示消息重新进入了队列。

步骤:
先启动一次消费者,再启动两个Consumer,然后重新启动Producer,在控制台依次录入aa、bb、cc;当生产者当发送cc后立刻关闭消费者01

结果:
正常情况下,由于轮询机制,消费者01会接收aa、cc并且进行确认,但是由于cc发送后消费者01在确认之前出现了宕机,所以消费者02会接收bb、cc并且进行确认。
在这里插入图片描述

7、源码地址

源码地址:https://gitee.com/lhzlx/rabbit-simple-demo.git