> 技术文档 > 深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

大家好,我是工藤学编程 🦉 一个正在努力学习的小博主,期待你的关注 实战代码系列最新文章😉 C++实现图书管理系统(Qt C++ GUI界面版) SpringBoot实战系列🐷 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 分库分表 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 消息队列 深入浅出 RabbitMQ-简单队列实战

前情摘要:

1、深入浅出 RabbitMQ-核心概念介绍与容器化部署
2、深入浅出 RabbitMQ-简单队列实战


【亲测宝藏】发现一个让 AI 学习秒变轻松的神站!不用啃高数、不用怕编程,高中生都能看懂的人工智能教程来啦!

👉点击跳转,和 thousands of 小伙伴一起用快乐学习法征服 AI,说不定下一个开发出爆款 AI 程序的就是你!


本文章目录

  • 深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)
    • 一、什么是工作队列?
    • 二、轮训策略(Round Robin)实战
      • 2.1 轮训策略原理
      • 2.2 代码实现
        • ① 消息生产者(Send)
        • ② 消息消费者(Recv1 & Recv2)
      • 2.3 轮训策略验证
        • 测试步骤:
        • 预期结果:
      • 2.4 轮训策略的缺点
    • 三、公平策略(Fair Dispatch)实战
      • 3.1 公平策略原理
      • 3.2 代码修改(仅需调整消费者)
      • 3.3 公平策略验证
        • 测试调整:
        • 测试步骤:
        • 预期结果:
    • 四、关键知识点总结
      • 注意事项:
    • 五、测试步骤回顾
    • 六、总结

深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)

一、什么是工作队列?

在实际业务中,当消息生产速度超过消费速度时(比如秒杀场景的订单消息、日志采集的大量数据),单消费者可能会导致消息堆积。工作队列(Work Queue) 通过引入多个消费者共同处理同一队列的消息,实现消息的分布式消费,解决\"生产快、消费慢\"的问题。

工作队列的核心特点:

  • 多个消费者监听同一队列,形成竞争关系
  • 消息一旦被一个消费者处理,其他消费者不会再收到该消息
  • 默认采用\"轮训策略\"分配消息,可通过配置优化为\"公平策略\"

二、轮训策略(Round Robin)实战

2.1 轮训策略原理

RabbitMQ默认的消息分配方式:将消息依次轮流发送给每个消费者,不考虑消费者的处理速度。例如10条消息会被均匀分给2个消费者(各处理5条)。

2.2 代码实现

① 消息生产者(Send)

发送10条消息到队列work_mq_rr,模拟高频率生产场景:

public class Send { private final static String QUEUE_NAME = \"work_mq_rr\"; public static void main(String[] argv) throws Exception { // 1. 配置连接工厂(同简单队列,确保虚拟主机一致) ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); factory.setUsername(\"admin\"); factory.setPassword(\"password\"); factory.setVirtualHost(\"/dev\"); // 生产者与消费者需使用同一虚拟主机 factory.setPort(5672); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 2. 声明队列(非持久化,非独占,不自动删除) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 3. 循环发送10条消息 for (int i = 0; i < 10; i++) { String message = \"Hello World! \" + i; // 发送消息(使用默认交换机,路由键=队列名) channel.basicPublish(\"\", QUEUE_NAME, null,  message.getBytes(StandardCharsets.UTF_8)); System.out.println(\" [x] 发送消息: \'\" + message + \"\'\"); } } }}
② 消息消费者(Recv1 & Recv2)

两个消费者监听同一队列,模拟不同处理速度(此处为简化,暂用相同休眠时间,实际可调整差异):

// 消费者1public class Recv1 { private final static String QUEUE_NAME = \"work_mq_rr\"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); factory.setUsername(\"admin\"); factory.setPassword(\"password\"); factory.setVirtualHost(\"/dev\"); // 注意:与生产者保持一致(原代码误写为/xdclass1,已修正) factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(\" [*] Recv1 等待消息...(按CTRL+C退出)\"); // 消息处理回调(模拟处理耗时:3秒) DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { TimeUnit.SECONDS.sleep(3); // 模拟业务处理时间 } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), \"UTF-8\"); System.out.println(\"Recv1 接收: \'\" + message + \"\'\"); // 手动确认消息(必须,否则消息会一直存在队列中) // 参数2:false表示仅确认当前消息,true表示确认所有小于当前tag的消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 关闭自动确认(autoAck=false),开启手动确认 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); }}
// 消费者2(代码与Recv1一致,仅打印标识不同)public class Recv2 { private final static String QUEUE_NAME = \"work_mq_rr\"; public static void main(String[] argv) throws Exception { // 连接配置与Recv1相同 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); factory.setUsername(\"admin\"); factory.setPassword(\"password\"); factory.setVirtualHost(\"/dev\"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(\" [*] Recv2 等待消息...(按CTRL+C退出)\"); // 消息处理回调(同样模拟3秒耗时) DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), \"UTF-8\"); System.out.println(\"Recv2 接收: \'\" + message + \"\'\"); // 手动确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); }}

2.3 轮训策略验证

测试步骤:
  1. 先启动两个消费者(Recv1、Recv2)
    深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验
    深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

  2. 再启动生产者(Send),发送10条消息
    深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

预期结果:
  • Recv1 接收消息:0、2、4、6、8(偶数)
  • Recv2 接收消息:1、3、5、7、9(奇数)
  • 两者各处理5条,均匀分配,但忽略处理能力差异(即使某消费者处理更快,也不会多分配)

实际结果
深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验
深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

2.4 轮训策略的缺点

当消费者处理能力不均时(比如Recv1处理需1秒,Recv2需5秒),轮训策略会导致:

  • 处理快的消费者(Recv1)处理完后处于空闲状态
  • 处理慢的消费者(Recv2)积压消息,整体消费效率低

三、公平策略(Fair Dispatch)实战

3.1 公平策略原理

为解决轮训的负载不均问题,公平策略让消费者处理完一条消息后,再接收下一条,实现\"能者多劳\"。核心配置:通过channel.basicQos(1)设置\"预取数\"为1,告诉RabbitMQ:“在我处理完当前消息并确认前,不要给我发新消息”。

3.2 代码修改(仅需调整消费者)

在消费者的channel.queueDeclare之后,添加basicQos配置:

// 消费者1(Recv1)添加公平策略配置channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 关键:设置预取数为1,开启公平策略channel.basicQos(1); // 每次只接收1条未确认的消息// 后续的消息处理逻辑不变...
// 消费者2(Recv2)同样添加:channel.basicQos(1);

3.3 公平策略验证

测试调整:

修改Recv1的处理耗时为1秒(快消费者),Recv2仍为3秒(慢消费者):

// Recv1的处理耗时改为1秒TimeUnit.SECONDS.sleep(1); 
测试步骤:
  1. 启动Recv1(1秒/条)和Recv2(3秒/条)
  2. 启动生产者发送10条消息
预期结果:
  • 快消费者(Recv1)会处理更多消息(比如7-8条)
    深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

  • 慢消费者(Recv2)处理较少消息(比如2-3条)
    深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)_讲讲rabbitmq实战经验

  • 整体消费效率显著提升,避免\"忙的忙死,闲的闲死\"

四、关键知识点总结

维度 轮训策略(Round Robin) 公平策略(Fair Dispatch) 核心逻辑 依次轮流分配,不考虑处理速度 处理完一条再分配下一条,按能力分配 配置要点 默认生效,无需额外配置 需设置channel.basicQos(1) 适用场景 所有消费者处理速度相近的场景 消费者处理速度差异较大的场景 消息确认 必须开启手动确认(autoAck=false) 必须开启手动确认(autoAck=false)

注意事项:

  1. 手动确认是前提:无论哪种策略,若使用basicQos,必须关闭自动确认(autoAck=false),并在处理完后调用basicAck确认,否则消息会一直堆积。
  2. 预取数调整basicQos(n)n可根据实际场景调整(如n=5表示允许预取5条未确认消息),避免网络频繁交互。

五、测试步骤回顾

  1. 轮训策略测试

    • 启动2个默认消费者(无basicQos
    • 发送10条消息,观察均匀分配结果
  2. 公平策略测试

    • 消费者添加channel.basicQos(1)
    • 差异化设置消费者处理速度,发送消息,观察\"能者多劳\"效果

六、总结

工作队列通过多消费者分布式消费,解决了\"生产快于消费\"的问题,而策略的选择直接影响效率:

  • 轮训策略实现简单,适合消费者能力均衡的场景;
  • 公平策略通过basicQos(1)实现负载均衡,适合消费者能力差异大的场景。

实际项目中,需根据消费者处理能力、消息紧急程度等因素选择策略,必要时结合消息优先级、持久化等特性,进一步优化消息处理链路。

觉得有用请点赞收藏!
如果有相关问题,欢迎评论区留言讨论~