> 技术文档 > 详解RabbitMQ工作模式之工作队列模式_rabbitmq工作队列

详解RabbitMQ工作模式之工作队列模式_rabbitmq工作队列

目录

工作队列模式

概念

特点

应用场景

工作原理

注意事项

代码案例

引入依赖

常量类

编写生产者代码

编写消费者1代码

编写消费者2代码

先运行生产者,后运行消费者

先运行消费者,后运行生产者


工作队列模式

概念

在工作队列模式中,一个生产者(producer)将任务发布到队列中,多个消费者(consumer)从队列中获取任务并执行。这种模式的主要目标是提高任务的并行处理能力,从而提高系统的吞吐量和效率。

特点

可以有多个消费者,但一条消息只能被一个消费者获取。
消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ采用轮询(Round-Robin)或公平分发(Fair Dispatch)的方式将消息发送给消费者。 

应用场景

1.任务分发:将任务分发给多个工作者(消费者),以便并行处理。这对于需要高吞吐量和任务处理效率的应用程序非常有用。例如,图像处理、视频编码、数据转换等应用可以使用工作队列模式来并行处理大量任务。
2.负载均衡:当有多个消费者时,工作队列模式可以用来实现负载均衡。任务将均匀分布给可用的消费者,以确保每个消费者都有工作可做,而且不会超负荷。
3.后台任务处理:在Web应用程序中,后台任务处理是一个常见的需求。工作队列模式可用于处理与Web请求无关的长时间运行任务,而不会影响用户体验。例如,发送电子邮件、生成报告、备份数据等后台任务可以使用工作队列来处理。

工作原理

1.生产者发送任务:生产者将任务封装为消息,并将其发送到RabbitMQ队列中。
2.RabbitMQ分发任务:RabbitMQ根据配置的分发策略(如轮询或公平分发)将任务分发给消费者。
3.消费者处理任务:消费者从队列中获取任务并执行。在处理完任务后,消费者会向RabbitMQ发送确认消息,表示任务已完成。
4.RabbitMQ确认任务完成:在收到消费者的确认消息后,RabbitMQ会将该任务从队列中移除。

注意事项

1.消息确认:为了确保消息不会丢失,消费者在处理完任务后需要向RabbitMQ发送确认消息。如果消费者在处理任务时失败或崩溃,RabbitMQ会将该任务重新分发给其他消费者。
2.负载均衡:RabbitMQ默认采用轮询方式将消息分发给消费者。如果需要更复杂的负载均衡策略,可以考虑使用其他分发策略或自定义交换机类型。
3.错误处理:在生产者和消费者中都需要添加适当的错误处理逻辑,以处理可能出现的异常情况,如连接失败、消息发送失败等。

代码案例
引入依赖
 com.rabbitmq amqp-client 5.21.0
常量类
public class Constants { public static final String HOST = \"47.98.109.138\"; public static final int PORT = 5672; public static final String USER_NAME = \"study\"; public static final String PASSWORD = \"study\"; public static final String VIRTUAL_HOST = \"aaa\"; //工作队列模式 public static final String WORK_QUEUE = \"work.queue\";}
编写生产者代码
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 发送消息 for (int i = 0; i < 10; i++) { String msg = \"hello work queue....\"+i; channel.basicPublish(\"\",Constants.WORK_QUEUE, null, msg.getBytes()); } System.out.println(\"消息发送成功~\"); //6. 资源释放 channel.close(); connection.close(); }}
编写消费者1代码
import com.rabbitmq.client.*;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 消费消息 DefaultConsumer consumer = new DefaultConsumer(channel){ //从队列中收到消息, 就会执行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(\"接收到消息:\"+ new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer); //6. 资源释放// channel.close();// connection.close(); }}
编写消费者2代码
import com.rabbitmq.client.*;import rabbitmq.constant.Constants;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 消费消息 DefaultConsumer consumer = new DefaultConsumer(channel){ //从队列中收到消息, 就会执行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO System.out.println(\"接收到消息:\"+ new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// //6. 资源释放// channel.close();// connection.close(); }}
先运行生产者,后运行消费者

查看管理界面

我们此时会看到,先启动的消费者会消费掉队列中所有的消息。

先运行消费者,后运行生产者

此时我们能看到,两个消费者都能够消费消息。