RabbitMQ介绍以及基本使用
文章目录
二、消息队列的作用(优点)
1、解耦
2、流量削峰
3、异步
4、顺序性
三、RabbitMQ基本结构
四、RabbitMQ队列模式
1、简单队列模式
2、工作队列模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、RPC模式
7、发布者确认模式
五、RabbitMQ相关属性描述
总结
一、什么是消息队列?
消息队列是一种用于在分布式系统中进行通信的技术。它是一种存储和转发消息的中间件,可以用
于将应用程序之间的通信解耦,从而实现高效的异步通信。消息队列允许发送者将消息发送到队列
中,而接收者则可以从队列中获取消息并进行处理。这种方式可以帮助系统实现高可用性、高性
能、松耦合和可伸缩性。消息队列通常包括生产者(发送消息的应用程序)、消费者(接收消息的
应用程序)和队列(存储消息的缓冲区)。
RabbitMQ:是由erlang语言开发,基于AMQP(高级消息队列协议)协议实现的一种消息队列。市面
上还有很多消息队列,比如Kafka、RocketMQ、Redis等,各有优劣,本文主要介绍RabbitMQ。
官方文档:RabbitMQ Tutorials | RabbitMQ
二、消息队列的作用(优点)
1、解耦
应用程序解耦,通过引入消息队列,不同的应用程序之间可以通过消息队列进行通信,而无需直接
调用对方的接口或方法。这样可以降低系统中各个应用程序之间的耦合度,使得它们可以独立演化
和扩展,而不会因为对方的变化而受到影响。
2、流量削峰
消息队列可以作为一个缓冲区,暂时存储流入的消息,直到系统有足够的资源来处理它们。当系统
出现流量高峰时,消息队列可以暂时存储过多的消息,以平滑处理流量的波动,避免系统被突发的
高负载压垮。
3、异步
发送者在发送消息后可以立即继续执行其他操作,而不需要等待接收者的响应。这样可以提高系统
的并发性和响应速度。也可以帮助提高系统的吞吐量,特别是在面对大量请求或处理复杂计算时。
发送者可以并行地向多个接收者发送消息,而不会因为等待接收者的响应而阻塞。
4、顺序性
虽然并不是所有消息队列都能保证消息的绝对顺序性,但是在许多情况下,消息队列可以保证消息
的相对顺序性。即按照发送顺序进行处理,对某些场景要求顺序执行很适合。
三、RabbitMQ基本结构
交换机是消息的接收和分发中心,负责接收生产者发送的消息,并根据指定的路由规则发送到一个或多个队列中。
(Exchange相当于Queue的代理,可以设置不同的写入策略,写入到对应的队列中。对于队列的写入,更加灵活)
交换机的类型有:fanout扇出、topic主题、direct直接
四、RabbitMQ队列模式
基于Exchange交换机,RabbitMQ截至目前有七种队列模式。
1、简单队列模式
一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。
图中P代表生产者,C代表消费者,Queue是队列名称。
我们看到是没有Exchange的,但是RabbitMQ也会有一个默认的交换机。这个默认的交换机通常被
称为\"amq.default\"或者\"\"(空字符串),是RabbitMQ自动创建的,用于在没有指定交换机的情况
下将消息发送到队列。
//生产者var factory = new ConnectionFactory { HostName = \"localhost\"}; //初始化连接信息using var connection = factory.CreateConnection(); //创建连接using var channel = connection.CreateModel(); //创建信道//声明一个队列,并将信道与队列绑定channel.QueueDeclare(queue: \"hello\", durable: false, exclusive: false, autoDelete: false, arguments: null);//发送消息的内容string message = $\"Hello World!\";var body = Encoding.UTF8.GetBytes(message);//信道绑定交换机channel.BasicPublish(exchange: string.Empty, routingKey: string.Empty, basicProperties: null, body: body);Console.WriteLine($\" [x] Sent {message}\");Console.WriteLine(\" Press [enter] to exit.\");//消费者var factory = new ConnectionFactory { HostName = \"localhost\" };using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.QueueDeclare(queue: \"hello\", durable: false, exclusive: false, autoDelete: false, arguments: null);Console.WriteLine(\" [*] Waiting for messages.\");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($\" [x] Received {message}\");};channel.BasicConsume(queue: \"hello\", autoAck: true, consumer: consumer);Console.WriteLine(\" Press [enter] to exit.\");
此时就会生产者发送一条消息,消费者就会接收一条消息。
2、工作队列模式
工作队列又叫做任务队列,正常会按顺序把消息发送给每一个订阅的消费者,平均而言,每个消费
者将获得相同数量的消息。(不是P发送一条消息,C1和C2都会收到,而是第一条C1消费,第二
条C2消费。每个消息只会被一个消费者接收和处理)。
这样的好处是可以提高吞吐量,因为生产者发送了很多消息,但是消费者只有一个,消费者处理很
慢,就会造成消息积压。
//生产者var factory = new ConnectionFactory { HostName = \"localhost\"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.QueueDeclare(queue: \"task_queue\", durable: true, exclusive: false, autoDelete: false, arguments: null);var message = $\"work queue\";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: string.Empty, routingKey: string.Empty, basicProperties: null, body: body);Console.WriteLine($\" [x] Sent {message}\");Console.WriteLine(\" Press [enter] to exit.\");//消费者var factory = new ConnectionFactory { HostName = \"localhost\" };using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.QueueDeclare(queue: \"task_queue\", durable: true, exclusive: false, autoDelete: false, arguments: null);Console.WriteLine(\" [*] Waiting for messages.\");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($\" [x] Received {message}\"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: \"task_queue\", autoAck: false, consumer: consumer);Console.WriteLine(\" Press [enter] to exit.\");
工作队列与简单队列一致,会有一个默认的交换机。
3、发布/订阅模式
发布/订阅模式是一种消息传递模式,它允许发送者(发布者)将消息发布到多个接收者(订阅
者)。消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本
不知道消息是否会被传递到任何队列。
所以消息传递模式,发布者不需要指定队列。
发布/订阅模式交换机类型为Fanout。
//发布者var factory = new ConnectionFactory { HostName = \"localhost\"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();//声明一个交换机,叫做logs,并且交换机的类型是Fanoutchannel.ExchangeDeclare(exchange: \"logs\", type: ExchangeType.Fanout);var message = \"publish_subscribe\";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: \"logs\", routingKey: string.Empty, basicProperties: null, body: body);Console.WriteLine($\" [x] Sent {message}\");Console.WriteLine(\" Press [enter] to exit.\");//接收者var factory = new ConnectionFactory { HostName = \"localhost\"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"logs\", type: ExchangeType.Fanout);//创建一个具有生成名称的非持久、独占、自动删除队列var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName, exchange: \"logs\", routingKey: string.Empty);Console.WriteLine(\" [*] Waiting for logs.\");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($\" [x] {message}\");};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);Console.WriteLine(\" Press [enter] to exit.\");
注:如果发布者已经发布消息到交换机,但还没有队列绑定到交换机,消息将会丢失。
4、路由模式
路由模式也是一种消息传递模式,是基于消息的路由键(routing key)来将消息从交换机
(exchange)发送到一个或多个队列中。相比较于发布/订阅模式,路由模式多了一个routing key
的概念。
路由模式交换机类型为Direct。
//生产者var factory = new ConnectionFactory { HostName = \"localhost\"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();//定义交换机名称以及类型为Directchannel.ExchangeDeclare(exchange: \"direct_logs\", type: ExchangeType.Direct);//定义路由键string routingKey = \"direct_test\";//发送消息体string message = \"direct_message\";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: \"direct_logs\", routingKey: routingKey, basicProperties: null, body: body);Console.WriteLine($\" [x] Sent \'{routingKey}\':\'{message}\'\");Console.WriteLine(\" Press [enter] to exit.\");//消费者var factory = new ConnectionFactory { HostName = \"localhost\" };using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"direct_logs\", type: ExchangeType.Direct);//创建一个具有生成名称的非持久、独占、自动删除队列var queueName = channel.QueueDeclare().QueueName;//路由键集合var routeKeyArr = new string[] { \"direct_test\", \"direct_test2\" };foreach (var routeKey in routeKeyArr){ channel.QueueBind(queue: queueName,exchange: \"direct_logs\",routingKey: routeKey);}Console.WriteLine(\" [*] Waiting for messages.\");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine($\" [x] Received \'{routingKey}\':\'{message}\'\");};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);Console.WriteLine(\" Press [enter] to exit.\");
路由模式,消费者可以监听多个路由键。
5、主题模式
基于路由模式,仍然有局限性——它不能基于多个标准进行路由。也就是一个消费者只能接收完全
与routing key相匹配的交换机。主题模式主要解决路由模式的不足,可以模糊匹配routing key。
路由模式交换机类型为Topic。
在生产者方面,基于 . 作为分隔符,用于routing key。比如“ stock.usd.nyse
”、“ nyse.vmw
”、
“ quick.orange.rabbit
”。可以是任何单词,但最多只有255 个字节。
在消费者方面,绑定routing key有两种重要的情况:
(1)*(星号):匹配一个单词。
具体语法:
var routeing_key = \"info.debug.error\";//匹配 info\"info.*.*\"//匹配debug\"*.debug.*\"//匹配error\"*.*.error\"
(2)#(散列):匹配零个或多个单词。
具体语法:
var routeing_key = \"info.debug.error\";//匹配 info\"info.#\"//匹配debug\"#.debug.#\"//匹配error\"*.*.error\"
6、RPC模式
RPC模式又叫\"请求/回复模式\"。
RPC(Remote Procedure Call,远程过程调用)是一种用于在分布式系统中进行通信的技术。它
允许一个进程(或线程)调用另一个进程(或线程)的过程(函数或方法),就像调用本地函数一
样,而不需要开发者显式处理底层通信细节。
(就是生产者发送一条消息,消费者端执行某个方法,获取值的同时,并返回到生产者。)
//生产者var factory = new ConnectionFactory { HostName = \"localhost\"};using var connection = factory.CreateConnection();using var channel = connection.CreateModel();//定义接收返回结果的队列var replyQueueName = channel.QueueDeclare().QueueName;var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var body = ea.Body.ToArray(); var response = Encoding.UTF8.GetString(body); Console.WriteLine(\" [.] Got \'{0}\'\", response);};//发送消息var correlationId = Guid.NewGuid().ToString(); //消息唯一性var props = channel.CreateBasicProperties();props.CorrelationId = correlationId;props.ReplyTo = replyQueueName; //回调队列名称string message = \"30\";var messageBytes = Encoding.UTF8.GetBytes(message);channel.BasicPublish( exchange: \"\", routingKey: \"rpc_queue\", basicProperties: props, body: messageBytes);channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true);//消费者var factory = new ConnectionFactory { HostName = \"localhost\" };using var connection = factory.CreateConnection();using var channel = connection.CreateModel();channel.QueueDeclare(queue: \"rpc_queue\", durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);var consumer = new EventingBasicConsumer(channel);channel.BasicConsume(queue: \"rpc_queue\", autoAck: false, consumer: consumer);Console.WriteLine(\" [x] Awaiting RPC requests\");consumer.Received += (model, ea) =>{ string response = string.Empty; var body = ea.Body.ToArray(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine($\" [.] Fib({message})\"); response = FibHelper.Fib(n).ToString(); } catch (Exception e) { Console.WriteLine($\" [.] {e.Message}\"); response = string.Empty; } finally { //回调到生产者队列 var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: string.Empty, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }};//执行函数,并返回结果public class FibHelper{ public static int Fib(int n) { if (n == 0 || n == 1) return n; return Fib(n - 1) + Fib(n - 2); }}
RPC模式不是消息传递模式,消息只会被一个消费者消费。
7、发布者确认模式
发布者确认模式(Publisher Confirmation)是 RabbitMQ 提供的一种机制,用于确保消息被成功
发送到交换机(exchange)并被接收到,以及确保消息被正确地路由到队列中。在传统的消息发
布过程中,发布者发送消息到交换机后,并不知道消息是否已经被正确地处理。为了解决这个问
题,RabbitMQ 提供了发布者确认模式,允许发布者确认消息是否已经被成功接收到。
//生产者var factory = new ConnectionFactory() { HostName = \"localhost\" };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){ // 设置信道为确认模式 channel.ConfirmSelect(); // 声明一个队列 channel.QueueDeclare(queue: \"hello\", durable: false, exclusive: false, autoDelete: false, arguments: null); // 消息内容 string message = \"Hello World!\"; var body = Encoding.UTF8.GetBytes(message); try { // 发送消息 channel.BasicPublish(exchange: \"\", routingKey: \"\", basicProperties: null, body: body); // 等待消息确认 if (channel.WaitForConfirms()) { Console.WriteLine(\" [x] Sent {0}\", message); } else { Console.WriteLine(\" [x] Failed to send {0}\", message); } } catch (Exception ex) { Console.WriteLine($\"An error occurred: {ex.Message}\"); }}Console.WriteLine(\" Press [enter] to exit.\");Console.ReadLine();//消费者var factory = new ConnectionFactory() { HostName = \"localhost\" };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){ // 声明一个队列 channel.QueueDeclare(queue: \"hello\", durable: false, exclusive: false, autoDelete: false, arguments: null); // 创建消费者 var consumer = new EventingBasicConsumer(channel); // 消费消息 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(\" [x] Received {0}\", message); }; // 消费者开始接收消息 channel.BasicConsume(queue: \"hello\", autoAck: true, consumer: consumer); Console.WriteLine(\" Press [enter] to exit.\"); Console.ReadLine();}
五、RabbitMQ相关属性描述
上述代码中,有很多属性的设置,下面解释一下。
arguments
true
时,表示消费者会自动确认收到的消息,此时队列中表示该消息已被消费成功了。当设置为 false
时,表示消费者需要显式地调用确认方法来告知 RabbitMQ 已经成功处理了消息,否则消息将被重新放回队列,等待其他消费者处理。总结
RabbitMQ 是一个消息队列,主要作用就是异步、顺序性、削峰等。
七种队列模式,可以根据不同的场景具体使用。
1. 简单队列模式
最简单的消息模式。一个生产者发送消息到一个队列,一个消费者从队列中接收消息并处理。适用
于单个生产者-单个消费者的简单场景。
2. 工作队列模式
多个消费者共同消费消息。消费者从队列中取出消息并处理,消息会平均地分配给消费者。
是基于简单队列模式的缺点,做了提升。适用于负载均衡和任务分发的场景。
3. 发布/订阅模式
生产者将消息发送到交换机,交换机将消息广播到所有与之绑定的队列。多个消费者可以订阅不同
的队列,从而接收消息的副本。适用于消息广播和通知的场景。
4. 路由模式
生产者发送消息到交换机,并使用路由键指定消息的目标队列。交换机根据消息的路由键将消息路
由到与之匹配的队列中。适用于根据消息内容进行精确路由的场景。
5. 主题模式
类似于路由模式,但是路由键可以使用通配符进行匹配。适用于消息的多样化路由和灵活的匹配需
求。
6. RPC模式
客户端(RPC请求者)发送请求消息到队列中,并等待服务器(RPC响应者)返回响应消息。
服务器监听请求队列,处理请求并将响应发送回客户端指定的队列。适用于需要请求-响应式通信
的场景,类似于远程调用。
7. 发布者确认模式
发布者确认模式是 RabbitMQ 提供的一种机制,用于确保消息在发送到交换机并被路由到队列时的
可靠性。