深入浅出 RabbitMQ-交换机详解与发布订阅模型实战
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署
2、深入浅出 RabbitMQ-简单队列实战
3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略)
【亲测宝藏】发现一个让 AI 学习秒变轻松的神站!不用啃高数、不用怕编程,高中生都能看懂的人工智能教程来啦!
👉点击跳转,和 thousands of 小伙伴一起用快乐学习法征服 AI,说不定下一个开发出爆款 AI 程序的就是你!
本文章目录
- 玩转RabbitMQ:交换机详解与发布订阅模型实战
-
- 一、为什么需要交换机(Exchange)?
- 二、RabbitMQ交换机类型全解析
-
- 关键细节:
- 三、发布订阅模型:基于Fanout交换机的实战
-
- 3.1 发布订阅模型的典型场景
- 3.2 核心原理
- 四、代码实战:发布订阅模型实现
-
- 环境准备
- 4.1 消息生产者(Send):发送消息到Fanout交换机
- 4.2 消息消费者(Recv1 & Recv2):监听并接收广播消息
- 五、实战验证:广播消息是否生效
-
- 测试步骤:
- 关键现象解析:
- 六、核心知识点总结
玩转RabbitMQ:交换机详解与发布订阅模型实战
一、为什么需要交换机(Exchange)?
在简单队列和工作队列中,生产者直接将消息发送到队列,但实际业务中,我们常需要更灵活的消息路由能力:比如\"将订单消息发给物流系统和支付系统\"、“将error级别的日志只发给告警服务”。这时,交换机(Exchange) 就成了关键角色。
简单说,交换机是RabbitMQ中消息的\"中转站\":
- 生产者不再直接发送消息到队列,而是发送到交换机;
- 交换机根据预设的路由规则,将消息转发到一个或多个队列;
- 交换机本身不存储消息:若没有队列绑定它,或没有匹配的路由规则,消息会直接丢失。
二、RabbitMQ交换机类型全解析
RabbitMQ提供4种交换机类型,核心差异在于路由规则的不同。其中前3种最常用,最后一种(Headers)因灵活性低较少使用。
*
和#
)*
匹配1个词,#
匹配多个)关键细节:
- Direct:最严格的匹配,适合一对一的精准路由;
- Fanout:转发速度最快,因为无需解析路由键,适合广播场景;
- Topic:最灵活,支持通配符,能覆盖大部分复杂路由需求。
三、发布订阅模型:基于Fanout交换机的实战
发布订阅(Publish/Subscribe)是RabbitMQ中经典的消息模式,核心是\"一条消息被多个消费者同时接收\",其底层依赖Fanout交换机实现广播能力。
3.1 发布订阅模型的典型场景
- 微信公众号:作者发一篇文章,所有订阅者都能收到;
- 日志系统:一条错误日志,同时发给告警服务、存储服务、分析服务;
- 实时通知:秒杀活动开始,所有在线用户收到通知。
3.2 核心原理
- 生产者将消息发送到Fanout交换机;
- 交换机将消息广播到所有与之绑定的队列;
- 每个队列对应一个消费者,因此所有消费者都能收到消息。
注意:每个消费者需要创建自己的队列(通常是临时队列),并绑定到Fanout交换机,否则无法接收消息。
四、代码实战:发布订阅模型实现
环境准备
- RabbitMQ服务已启动(参考前文部署教程);
- 依赖:
amqp-client
(同前文,版本5.10.0+); - 虚拟主机:
/dev
(确保生产者和消费者一致)。
4.1 消息生产者(Send):发送消息到Fanout交换机
public class Send { // 交换机名称(全局唯一,生产者和消费者需一致) private final static String EXCHANGE_NAME = \"exchange_fanout\"; public static void main(String[] argv) throws Exception { // 1. 创建连接工厂并配置参数 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); // RabbitMQ服务器地址 factory.setUsername(\"admin\"); // 用户名 factory.setPassword(\"password\"); // 密码 factory.setVirtualHost(\"/dev\"); // 虚拟主机(必须一致) factory.setPort(5672); // AMQP协议端口 // 2. 创建连接和信道(try-with-resources自动关闭) try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 3. 声明交换机:类型为Fanout(扇形) /** * 参数说明: * 1. exchange:交换机名称 * 2. type:交换机类型(BuiltinExchangeType.FANOUT) * 3. durable:是否持久化(重启后交换机依然存在) * 4. autoDelete:是否自动删除(最后一个绑定解除后删除) * 5. internal:是否内部交换机(一般为false,外部可发送消息) * 6. arguments:额外参数 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); // 4. 发送消息到交换机(Fanout交换机无需指定路由键) String message = \"Hello World! 这是一条广播消息~\"; /** * 参数说明: * 1. exchange:目标交换机名称 * 2. routingKey:路由键(Fanout类型无用,可设为空) * 3. props:消息属性(如持久化、优先级等) * 4. body:消息体(字节数组) */ channel.basicPublish(EXCHANGE_NAME, \"\", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(\" [x] 生产者已发送消息: \'\" + message + \"\'\"); } }}
4.2 消息消费者(Recv1 & Recv2):监听并接收广播消息
两个消费者逻辑一致,均需绑定到Fanout交换机,通过临时队列接收消息。
// 消费者1public class Recv1 { private final static String EXCHANGE_NAME = \"exchange_fanout\"; public static void main(String[] argv) throws Exception { // 1. 连接配置(与生产者完全一致) ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); factory.setUsername(\"admin\"); factory.setPassword(\"password\"); factory.setVirtualHost(\"/dev\"); factory.setPort(5672); // 2. 创建连接和信道(消费者保持长连接,不自动关闭) Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 3. 声明交换机(与生产者一致,确保交换机存在) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); // 4. 创建临时队列(发布订阅模式专用) /** * 特点: * - 随机生成队列名称(如amq.gen-xxxx) * - 独占队列(exclusive=true):仅当前连接可访问,连接关闭后自动删除 * - 自动删除(autoDelete=true):最后一个消费者断开后删除 */ String queueName = channel.queueDeclare().getQueue(); // 无参数表示创建临时队列 System.out.println(\"Recv1 绑定的临时队列名称:\" + queueName); // 5. 绑定队列与交换机(Fanout交换机无需指定路由键) channel.queueBind(queueName, EXCHANGE_NAME, \"\"); // 6. 定义消息处理回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), \"UTF-8\"); System.out.println(\"Recv1 收到消息:\" + message); // 若需手动确认消息,可添加 channel.basicAck(...)(此处简化用自动确认) }; // 7. 开始消费消息(自动确认模式,适合简单场景) channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); System.out.println(\"Recv1 已启动,等待接收消息...(按CTRL+C退出)\"); }}
// 消费者2(代码与Recv1完全一致,仅打印标识不同)public class Recv2 { private final static String EXCHANGE_NAME = \"exchange_fanout\"; public static void main(String[] argv) throws Exception { // 连接配置、交换机声明、临时队列创建、绑定逻辑与Recv1一致 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"192.168.229.128\"); factory.setUsername(\"admin\"); factory.setPassword(\"password\"); factory.setVirtualHost(\"/dev\"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); String queueName = channel.queueDeclare().getQueue(); System.out.println(\"Recv2 绑定的临时队列名称:\" + queueName); channel.queueBind(queueName, EXCHANGE_NAME, \"\"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), \"UTF-8\"); System.out.println(\"Recv2 收到消息:\" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); System.out.println(\"Recv2 已启动,等待接收消息...(按CTRL+C退出)\"); }}
五、实战验证:广播消息是否生效
测试步骤:
-
启动消费者:先运行Recv1和Recv2,控制台会输出各自绑定的临时队列名称(如
amq.gen-abc123
);
-
启动生产者:运行Send,发送一条消息;
-
观察结果:Recv1和Recv2的控制台均会打印收到的消息,说明Fanout交换机成功将消息广播到两个队列。
关键现象解析:
- 若未绑定队列到交换机:生产者发送的消息会因\"无匹配队列\"而丢失;
- 临时队列的作用:每个消费者独占一个临时队列,确保消息能被各自接收,且消费者退出后队列自动清理,不占用资源。
六、核心知识点总结
- 交换机的核心职责:转发消息,不存储,路由规则由类型决定;
- Fanout交换机特点:
- 广播消息到所有绑定的队列,无视路由键;
- 适合\"一对多\"的发布订阅场景,转发效率最高;
- 临时队列设计:
- 随机名称+独占(exclusive=true)+自动删除(autoDelete=true);
- 避免手动创建队列的麻烦,适合临时消费场景;
- 消息不丢失的前提:必须有队列与交换机绑定,且路由规则匹配。
觉得有用请点赞收藏!
如果有相关问题,欢迎评论区留言讨论~