> 技术文档 > RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~
🌱🌱个人主页:奋斗的明志
🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

在这里插入图片描述

Work Queues 工作队列

  • 前言
  • Work Queues (工作队列)
    • 1、引入依赖
    • 2、编写生产者代码
    • 3、编写消费者代码
    • 4、运行程序, 观察结果

前言

在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法
简单模式
快速入门程序就是简单模式.
RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

默契之舞 之 生产者消费者模式(RabbitMQ)

Work Queues (工作队列)

简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

1、引入依赖


<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version></dependency>

2、编写生产者代码

工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本⼀样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一发送10条消息


for (int i = 0; i < 10; i++) { String mag = \"hello work queue......\" + i; channel.basicPublish(\"\", Constants.WORK_QUEUE, null, mag.getBytes());}

Constant 包下 Constants 类

package rabbitmq.constant;public class Constants { public static final String HOST = \"123.57.16.61\"; public static final Integer PORT = 5672; public static final String USERNAME = \"study\"; public static final String PASSWORD = \"study\"; public static final String VIRTUAL_HOST = \"bite\"; //声明一个工作队列 public static final String WORK_QUEUE = \"work.queue\";}

完整代码:

package rabbitmq.work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列模式 */public class Prooducer { public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USERNAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //创建一个新的连接 Connection connection = connectionFactory.newConnection(); //开启一个通信 Channel channel = connection.createChannel(); //声明交换机,使用内置的交换机 //声明一个队列 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //发送消息 for (int i = 0; i < 10; i++) { String mag = \"hello work queue......\" + i; channel.basicPublish(\"\", Constants.WORK_QUEUE, null, mag.getBytes()); } System.out.println(\"消息发送成功~\"); //资源释放 channel.close(); connection.close(); }}

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

3、编写消费者代码

消费者代码和简单模式⼀样, 只是复制两份. 两个消费者代码可以是⼀样的

消费者1:

package rabbitmq.work;import com.rabbitmq.client.*;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USERNAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //创建一个新的连接 Connection connection = connectionFactory.newConnection(); //开启一个通信 Channel channel = connection.createChannel(); //声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //消费消息 //消费的逻辑 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(\"接受到消息:\" + new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// channel.close();// connection.close(); }}

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

消费者2:

package rabbitmq.work;import com.rabbitmq.client.*;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USERNAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //创建一个新的连接 Connection connection = connectionFactory.newConnection(); //开启一个通信 Channel channel = connection.createChannel(); //声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //消费消息 //消费的逻辑 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(\"接受到消息:\" + new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// channel.close();// connection.close(); }}

4、运行程序, 观察结果

先启动两个消费者运行, 再启动生产者
如果先启动⽣产者, 在启动消费者, 由于消息较少, 处理较快, 那么第⼀个启动的消费者就会瞬间把10条
消息消费掉, 所以我们先启动两个消费者, 再启动生产者

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


观察RabbitMQ客户端:

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

启动生产者:

RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式


RabbitMQ Work Queues (工作队列模式) 使用案例_mq中queue模式

在这里插入图片描述

在这里插入图片描述