> 文档中心 > RabbitMQ 消息可靠性问题

RabbitMQ 消息可靠性问题

目录

    • 一. 两种消费思路
    • 二. 确保消费成功的两种思路
    • 三. 消息确认
      • I. 自动确认
      • II. 手动确认
        • A. 推模式手动确认
        • B. 拉模式手动确认
      • III. 消息拒绝

一. 两种消费思路

  1. RabbitMQ 的消息消费中,整体上来说有两种不同的思路:

    (1) 推(push):MQ 主动将消息推送给消费者,这种方式需要消费设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式
    (2) 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方法

  2. 先来看第一种:推(push),这种其实我们上面一直都在使用,就是通过注解 @RabbitListener 注解去监听消费者

        @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_01)    public void handMsg(String msg) { System.out.println("handMsg = " + msg);// int i = 1 / 0;    }
  3. 大家可以自己在类中手动设置一条异常,然后启动,当监听到队列中有消息时,就会触发这个方法了

  4. 再开看第二种:拉(pull),这种是通过 rabbitTemplate 中的 receiveAndConvert 方法来拉取一条消息下来。如果该方法返回值为 null,表示该队列上没有消息了。

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull = (String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02);System.out.println("pull = " + pull);}

    注意:这里与我们上面用的 convertAndSend 是不一样的,这个是发送消息,而 receiveAndConvert 则是拉取消息。

  5. 如果需要从消息队列中持续获取消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。

  6. 切记将拉模式放到一个死循环中,变相订阅消息,这会严重影响 RabbitMQ 的性能

二. 确保消费成功的两种思路

  1. 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
    (1) 当 autoAck 为 false 时,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除。
    (2) 当 autoAck 为 true 时,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除,即使这些消息并没有到达消费者。
  2. 换句话说,就是当 autoAck 为 false 时,消费者就变得非常从容了,它将会有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack ,此时 RabbitMQ 才会认为这条消息消费成功。
  3. 如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
  4. 确保消息被成功消费,无非就是两种:手动 Ack 或者自动 Ack ,当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息的时候,解决幂等性的问题。
  5. 解决幂等性的问题:可以参考一下我的这篇文章:

三. 消息确认

I. 自动确认

  1. 在 Spring Boot 中,默认情况下,消息消费就是自动确认的。

  2. 下面就是消息消费的方法,其实与之前的也是一样的。因为推模式中,就是自动确认的。

    @Componentpublic class DirectConsumer {    /**     * 监听队列     *     * 默认情况下,这个方法是自动确认消息是否消费成功的     * 如果这个方法抛出异常,表示消息消费失败,消息会重新回到队列中(RabbitMQ 主页的 ready 中)     * 然后客户端会立马重试,然后又把消息拉到客户端来处理,又抛出异常...(陷入死循环)     */    @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_01)    public void handMsg(String msg) { System.out.println("handMsg = " + msg);// int i = 1 / 0;    }    }

    通过 @Componet 注解将当前类注入到 Spring 容器中,通过 @RabbitListenter 注解来标记一个消息消费,默认情况下,消息消费方法自带事务。

    即如果该方法执行的过程中,抛出异常,那么消息会重新回到队列中等待下一次被调用,如果该方法正常执行完成没有抛出异常,那么这条消息就算是被消费了

II. 手动确认

手动确认又分为两种:推模式手动确认拉模式手动确认

A. 推模式手动确认

  1. 要开启手动确认,首先需要在 application.properties 中开启手动确认

    # 设置消息的确认模式改为手动,默认是自动的spring.rabbitmq.listener.simple.acknowledge-mode=manual
  2. 改为手动确认之后,消费者中的代码就与之前不一样了

    @Componentpublic class DirectConsumer {    /**     * 这里是手动确认     */    @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_02)    public void handleMsg02(Message message, Channel channel) throws IOException { //获取消息的唯一标记 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try {     //获取消息的内容     byte[] body = message.getBody();     //把消息转换成字符串     System.out.println("msg = " + new String(body, 0, body.length));     /**      * 能够执行到这里,说明没有问题,手动确认消息已经收到      *      * 参数一:消息的唯一标记;      * 参数二:false 表示仅确认当前消息消费成功; true 表示之前还没有确认的消息都消费成功      */     //int i = 1 / 0;     channel.basicAck(deliveryTag,false); } catch (IOException e) {     e.printStackTrace();     /**      * 拒绝当前消息的消费      *      * 参数一:消息的唯一标记      * 参数二:false 表示仅拒绝当前消息消费; true 表示拒绝之前所有没有被消费者消费的消息      * 参数三:被拒绝的消息是否重新入队      */     channel.basicNack(deliveryTag, false, true); }    }}

    (1) 将消费者需要做的事情都写入到 try-catch 里面

    (2) 如果消息正常消费,则执行 basicAck 确认消费成功

    (3) 反之,则执行 basicNack 高数 RabbitMQ 消息消费失败

  3. 在上面的代码中,对 basicAckbasicNack 这两个方法中的参数说明也已经写在注释中了,这里就不继续说了 。

  4. 那么怎么测试呢?

    测试之前先补充一个内容:

    • Unacked:表示已经发送给消费者但还收到消费 ack 的消息数量
    • Ready:表示待消费的消费数量

    (1) 首先在 RabbitMQ 的 web 管理端确认一下测试的队列中是否有待消费的消息
    确认是否有待消费的消息

    (2) 然后在 try-catch 中手动设置一个异常,启动项目,这时候因为生产者还没有发送消息,异常是不会报错的

    (3) 接着,在生产者中给消费者发送一条消息,我这里还是用前面使用多的单元测试中的 test04

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test04(){rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_02,"Hello Peng");}

    (4) 启动之后,观察 RabbitMQ 页面以及启动类日志变化。此时会发现,启动类日志报错了,而 RabbitMQ 页面中 Unacked 中待确认的消息也显示一条,说明消费者那里出现了异常,还没有确认。
    RabbitMQ页面变化

    (5) 这时候把消费者关闭,再次观察 RabbitMQ 页面变化,就会发现 Unacked 没了,而待消费数量变成 1 。说明:消费者那里没有确认消息,消息从新回到了 ready 中。
    RabbitMQ页面变化2

B. 拉模式手动确认

补充扩展:

  1. 其实拉模式也是可以自动确认的,只需要在 @Transactional 注解在事务中 进行即可。代码就是我们上面介绍 拉模式 时候的 test08 ,加个注解即可

    @AutowiredRabbitTemplate rabbitTemplate;@Test    @Transactionalpublic void test08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull = (String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02); //int i = 1 / 0;System.out.println("pull = " + pull);}
  2. 注意,此时的 @Transactional 注解只是一个标记,还需要配置一下 TransactionManager ,在配置类中增加一行配置。我们前面发送可靠性中:开启事务机制 的时候也用过

        @Bean    PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory);    }
  3. 当有异常时,拉下来的消息就不会确认;正常的话就自动确认。

手动确认

  1. 拉模式的手动确认就比较麻烦了,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以这里得使用原生的方法去操作

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test02(){//创建一个不带事务的消息通道。有了通道就好办了,就可以进行确认与取消了Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);long deliveryTag = 0;try {//这里是获取队列的一条消息//参数二:表示此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除GetResponse GetResponse = channel.basicGet(DirectConfig.MY_DIRECT_QUEUE_NAME_02, false);//获取标记deliveryTag = GetResponse.getEnvelope().getDeliveryTag();//int i = 1 / 0;//确认消息channel.basicAck(deliveryTag,false);} catch (IOException e) {e.printStackTrace();try {//取消消息channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}}
  2. channel.basicGet() 这个方法中,第一个参数就是队列的名字,第二个参数则是上面提到的 autoAck 了,这个在上面 确保消费成功的两种思路 中提到过。

  3. 这里的异常测试与前面提到的都大同小异,通过观察 RabbitMQ 的 web 端管理页面自行测试即可。

III. 消息拒绝

  1. 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。

  2. 拒绝的方式如下:

        @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_02)    public void handleMsg03(Message message, Channel channel) { //获取消息编号 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try {     //拒绝消息     channel.basicReject(deliveryTag, true); } catch (IOException e) {     e.printStackTrace(); }    }
  3. 消息拒绝只是 basicReject 该方法的调用与前面不同,其他都是一样的,这里就不做演示了。