> 技术文档 > RabbitMq:消费侧未限流导致的rabbitmq报错异常,消息不能正常消费。_rabbitmq消费端消费异常

RabbitMq:消费侧未限流导致的rabbitmq报错异常,消息不能正常消费。_rabbitmq消费端消费异常


1、问题现象

之前写过两篇关于rabbitmq异步消费的文章:一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现),一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。在调试这个消费者设计时,遇到了这么一个问题,当mq中消息很多出现堆积时,unacked的消息会突然增高,导致mq出现报错:

 

从这个报错上看,共出现了2个问题:

  • mq连接因为写入器发送超时而被关闭

  • 连接被关闭,并显示 enotconn 错误,这表明在尝试关闭连接时,连接已经断开。

这个时候,我们查看mq的消费者数量已经为0,确实消费者的链接已经断开:

 

2、问题分析

首先看一下消费者的代码,核心代码如下:

async def callback(self, message: aio_pika.IncomingMessage): \"\"\"callback.\"\"\" try: # print(f\"Consumer 1 received: {message.body.decode()}\") self.async_queue.put_nowait((message, message.body.decode())) except Exception: print(\"callback error!\")

为了提高消费者的数据吞吐量,在mq的接收回调中,只把消息放入了异步队列中,消息的处理放在主程序批量处理,这个步骤相对来说耗时较多,因此就会出现一个现象,callback不停的从mq中取消息,消息被消费后,才会执行ack的操作,因为批量处理消息耗时相对较长,所以unacked的消息会越来越多,当达到一定程度后,将会导致mq写入器写入超时,从而导致mq连接断开,在老版本的mq上甚至出现无法继续消费消息的现象。 

3、解决方案

出现该问题的本质原因,就是因为unacked的消息突然增多,消息的处理速度跟不上接收的速度,所以要想解决这个问题,就必须限制从mq中的接受速度,也就是unacked的数据量。

最开始想的是限制接收函数中异步队列的长度,将其最大长度限定在1000

self.async_queue = asyncio.Queue(1000)

但是没有奏效,程序启动后,unacked的消息量和之前一致,瞬间增加到万的数量级:

所以这个办法解决不了我们的问题,最后发现,需要对mq的channel设置这么一个参数:

await self.channel.set_qos(prefetch_count=1000)

这段代码的含义如下:

在 RabbitMQ 中,set_qos 方法用于设置 Quality of Service(服务质量)参数,以控制客户端如何接收和处理来自队列的消息。具体来说,prefetch_count 参数用于指定客户端在确认已处理的消息之前,最多可以预取多少条消息。

加上这行代码后,unacked的消息将会被限定在1000内,问题解决:

4、总结

上面这个问题,在消息量较少时不容易出现,因为批量处理的速度是能跟上消息的接收速度的,但是当消息量一旦较大,消息处理跟不上消息的接收时,问题将会暴露,之前两篇文章介绍的异步消费架构,必须加上对prefetch_count的设置,否则会有出现这个问题的可能。

欢迎各位热爱编程、热爱技术的小伙伴们关注,聊聊跑步、聊聊技术~~~。 

往期推荐

我在百度的这10年~~

云冈石窟:翻开这本距今1565年、与天地同久长的石头史书,感受北魏王朝雕刻艺术的巅峰之作。

历经沧桑的应县木塔,在风雨中已等你969年。

从北京到大同,走过600里,跨越1000年。

一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现)

一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。

一个读写excel的简单程序(golang)

命令行参数的艺术:Python、Golang、C++技术实现

supervisor,你理应知道。

借助tritonserver完成gpt2模型的本地私有化部署

Elasticsearch高级检索对决:search_after+pit和scroll,谁才是最佳选择?

protobuf c++开发快速上手指南

golang操作mysql之利器-gorm

Elasticsearch写入、读取、更新、删除以及批量操作(golang)

golang线程池ants-实现架构

跑步的第六年,才真正了解运动的意义

武汉抗疫英雄汪勇:平凡人的非凡之举。

李白:为何两次选择做了上门女婿?

纳兰性德-我是人间惆怅客,世间唯有『若』字,最难成真

2025年,我要做个自我介绍