> 技术文档 > RabbitMQ--消费端限流

RabbitMQ--消费端限流


📌 一、核心概念:prefetch(限流)

✅ 什么是 prefetch?

  • prefetchCount 是 RabbitMQ 提供的限流机制:

控制一个消费者在未 ack 之前,最多能收到多少条消息

例如:prefetch = 10,消费者最多收到 10 条未确认的消息,没确认之前不再推送新消息。


📦 二、原生 RabbitMQ:限流设置与 ack 影响


✅ 1. 原生限流设置:channel.basicQos()

channel.basicQos(10); // 限制最多处理10条未ack的消息

✅ 2. 开启“手动 ack”时,限流生效(推荐 ✅)

import com.rabbitmq.client.*;public class ManualAckLimit { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"localhost\"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); String QUEUE_NAME = \"test_queue\"; channel.queueDeclare(QUEUE_NAME, true, false, false, null); // ✅ 设置限流:每次最多接收10条未ack的消息 channel.basicQos(10); // ❗ false 表示手动 ack 模式 channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> { String msg = new String(delivery.getBody()); System.out.println(\"接收消息:\" + msg); // 业务处理... Thread.sleep(1000); // ✅ 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }, consumerTag -> {}); }}
📌 说明:
  • basicQos(10) 配合手动 ack:RabbitMQ 会控制只发10条未确认消息

  • 限流生效 ✅


❌ 3. 自动 ack 模式下,限流失效(不推荐)

channel.basicQos(10); // 设置限流channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); // autoAck = true
📌 说明:
  • 设置虽然写了 basicQos(10),但因为 autoAck = true,RabbitMQ 会立即认为消息处理完成

  • 限流机制不生效,RabbitMQ 会一直推消息给消费者

  • 无法控制负载,消费者可能“被压垮”


☕ 三、Spring Boot 中限流与 ack 提交方式

Spring Boot 使用 Spring AMQP 封装,配置项如下:

spring: rabbitmq: listener: simple: prefetch: 10  # (不写默认是250)限制未确认消息数量(生效条件见下) acknowledge-mode: auto # 可选:auto、manual、none

✅ 1. auto 模式(默认值 ✅ 推荐)

@RabbitListener(queues = \"test_queue\")public void consume(String msg) { System.out.println(\"接收到消息:\" + msg); // 方法执行完毕后,Spring 自动 ack(前提是无异常)}
📌 说明:
  • Spring 会自动 ack(方法无异常)

  • 实际底层使用 channel.basicConsume(..., false),非原生 autoAck

  • 所以 prefetch 生效 ✅


✅ 2. manual 模式(完全手动 ack)

@RabbitListener(queues = \"test_queue\", ackMode = \"MANUAL\")public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); try { System.out.println(\"处理消息:\" + msg); // 手动 ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理失败时可重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); }}
📌 说明:
  • 你自己完全控制 ack 时机

  • prefetch 一定生效 ✅

  • 适合复杂业务、批量确认、失败重试


⚠️ 3. none 模式(不确认 ❌ 极不推荐)

acknowledge-mode: none
📌 说明:
  • Spring 不会发送任何 ack,RabbitMQ 也不会等

  • 一发出就当作“处理完”,消息可能丢失

  • prefetch 无效 ❌

  • ⚠️ 生产环境严禁使用!


📊 四、整体对比总结表(必记)

场景 ack 模式 ack 控制方式 prefetch 生效? 说明 原生 + 手动 ack autoAck = false 手动 basicAck() ✅ 生效 推荐 原生 + 自动 ack autoAck = true 消息发出立刻 ack ❌ 无效 不推荐 Spring Boot - auto(默认) acknowledge-mode: auto 方法执行完自动 ack ✅ 生效 推荐 Spring Boot - manual acknowledge-mode: manual 需手动调用 basicAck() ✅ 生效 推荐 Spring Boot - none acknowledge-mode: none 不确认 ❌ 无效 危险!

✅ 五、建议使用方式(结论)

场景 推荐配置 原生客户端 basicQos(x) + autoAck = false Spring Boot(简单业务) acknowledge-mode: auto Spring Boot(复杂控制) acknowledge-mode: manual Spring Boot(测试) acknowledge-mode: none(仅测试)

小问题:不用线程池, channel.basicQos(10);用和不用有啥区别?不都是单线程执行?起到限流的作用了吗?

  • 单线程同步处理,basicQos 可不设置或设1,控制更严格,避免内存堆积

  • 多线程或异步消费,建议合理设置 basicQos 限流,控制客户端预取消息数量,避免消息积压和压力暴增。

单线程执行的情况下,basicQos(10)确实还是起到了限流作用,但效果体现得比较微妙:

  • 基本原理basicQos(10)告诉RabbitMQ,最多给该消费者推送10条未ack的消息,这限制了消息在网络和客户端缓存中的积压量。

  • 你的场景:单线程同步处理,每次只能处理一条消息,处理慢(1秒),每处理完一条才ack。
    RabbitMQ会先推1条消息,等你ack后推下一条,以此类推。
    因此,即使你设置了basicQos(10),由于ack速度慢,最多只有1条消息处于“未ack”状态,实际并不会同时有10条消息待处理。

  • 总结
    basicQos(10)确实限制了预取数量,但你的单线程同步场景本身“处理速度=ack速度”,
    实际上RabbitMQ只能推送1条消息后等待确认,
    所以看起来和设置basicQos(1)或没设置效果没区别
    也就是说限流机制起了作用,但由于业务处理本身的串行特性,没体现出多条并发的差别。


如果你改成多线程异步处理,basicQos(10)才会让你最多并发处理10条消息,限流效果明显。