rabbitMq内容整理
一、RabbitMQ基础概念
RabbitMQ是一个开源的、实现AMQP(高级消息队列协议)的消息代理中间件,负责应用间异步消息传递、解耦、削峰、异步处理任务等。
核心概念有:
-
生产者(Producer):发送消息的程序。
-
消费者(Consumer):接收并处理消息的程序。
-
Broker:消息服务器,RabbitMQ服务端程序。
-
Exchange(交换机):负责根据规则分发消息到队列。
-
Queue(队列):存放消息的地方。
-
Binding(绑定):Exchange和Queue之间的关系,包含路由规则。
-
Routing Key(路由键):决定消息如何路由到队列。
二、RabbitMQ的消息模型
RabbitMQ支持以下Exchange类型:
常用场景举例:
-
direct模式
-
一个exchange,一个queue绑定routingKey为
user.register
,生产者发送时携带user.register
路由键,消息被精确发送到该队列。
-
-
fanout模式
-
一个exchange绑定多个queue,发送消息时所有队列都能接收,适合发布订阅广播场景。
-
-
topic模式
-
可以使用通配符
*
匹配单个单词,#
匹配0或多个单词,例如:-
log.error
-> 精确匹配 -
log.*
匹配log.error
、log.info
-
log.#
匹配log.error.http
-
-
三、RabbitMQ重要特性与工作中常用知识点
1、消息可靠性(必掌握)
RabbitMQ实现消息可靠传递的机制:
-
生产者端确认(Publisher Confirm)
-
RabbitMQ确认生产者的消息已成功到达Broker。
-
设置
channel.confirmSelect()
开启确认模式。
-
-
Broker端持久化
-
Exchange、Queue、Message都需要开启持久化(durable=True),消息会落盘。
-
注意:消息可靠性会影响性能。
-
-
消费者端ACK机制
-
消费者明确告知消息消费完成,Broker才会删除消息。
-
自动ACK和手动ACK(推荐)两种模式:
-
自动ACK (
autoAck=true
),消费异常时可能导致消息丢失。 -
手动ACK (
channel.basicAck(deliveryTag, false)
),保障数据可靠性。
-
-
2、消息幂等性(必掌握)
-
防止重复消息消费引发的数据一致性问题。
-
一般做法:
-
使用消息唯一ID(UUID)去重;
-
利用Redis或数据库的唯一键做去重校验。
-
3、消息死信队列(DLX)
-
用于处理未被正常消费的消息(超过重试次数、消息过期、队列满了)。
-
DLX机制:
-
创建单独死信交换机和队列;
-
将业务队列绑定到死信交换机,消息异常或超时后自动进入死信队列。
-
4、消息延迟队列
-
利用TTL(消息存活时间)和DLX实现延迟队列功能:
-
设置消息或队列TTL,消息过期进入DLX。
-
-
常用场景:
-
订单超时未支付自动取消;
-
延迟发送通知。
-
5、消息优先级队列
-
设置队列为priority模式:
Map args = new HashMap();args.put(\"x-max-priority\", 10);channel.queueDeclare(queue, durable, exclusive, autoDelete, args);
-
消息发送时指定优先级:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .priority(5) .build();
6、流控与消息积压问题处理(必掌握)
-
消费端限流(QoS)
java
复制编辑
channel.basicQos(10); // 一次最多处理10条未确认消息
-
设置合理的消费者数量,防止消息堆积。
-
监控队列长度和消费速率,及时预警处理。
四、RabbitMQ集群与高可用(常用)
-
镜像队列(Mirror Queue):
-
同步队列数据到多个节点,提高可用性,防止单点故障。
-
-
集群模式(Cluster):
-
主节点维护元数据,其他节点从节点复制数据。
-
-
Federation(联合)模式:
-
异地多数据中心间消息复制。
-
五、RabbitMQ性能优化(常用)
-
使用长连接,避免频繁开关。
-
批量发送消息,减少网络IO。
-
调整消息持久化策略,非关键数据可关闭。
-
合理配置消费并发数和线程池。
六、工作中常见问题与解决方法(重点)
七、监控管理(必掌握)
-
使用RabbitMQ Management Plugin
-
默认端口
15672
; -
查看队列、连接、交换机、消息堆积、消费者情况;
-
可手动删除消息、重置连接。
-
-
Prometheus + Grafana 监控方案:
-
深入监控队列深度、消息消费速率、连接数、资源消耗等。
-
分界线
-------------------------------------------------------------------------------------------------------------
一、为什么选择 RabbitMQ 而不是其他中间件(如 Kafka、RocketMQ)?
面试官一般想了解你对常用MQ的理解差异,考察你技术选型能力。
举例场景:
-
RabbitMQ适合高可靠业务通知场景,如订单支付结果通知;
-
Kafka适合日志传输和大数据处理场景;
-
RocketMQ更适合电商领域,有严格顺序消息需求。
二、如何保证RabbitMQ的消息可靠性、幂等性?
1. 消息可靠性(生产者→MQ→消费者):
生产者:
-
开启confirm确认机制:
channel.confirmSelect(); // 开启确认模式channel.basicPublish(exchange, routingKey, null, message.getBytes());if (channel.waitForConfirms()) { System.out.println(\"消息投递成功\");} else { System.out.println(\"消息投递失败,需要重试或记录\");}
MQ服务器:
-
设置队列和Exchange持久化:
channel.exchangeDeclare(\"exchange\", \"direct\", true);channel.queueDeclare(\"queue\", true, false, false, null);channel.queueBind(\"queue\", \"exchange\", \"routingKey\");
-
消息持久化发送:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2代表持久化消息 .build();channel.basicPublish(\"exchange\", \"routingKey\", props, message.getBytes());
消费者:
-
使用手动ACK机制(关键):
channel.basicConsume(\"queue\", false, (consumerTag, delivery) -> { try { // 处理消息 System.out.println(\"收到消息: \" + new String(delivery.getBody())); // 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { // 拒绝消息并重回队列 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); }}, consumerTag -> {});
2. 消息幂等性(防止消息重复消费):
幂等性就是同一消息多次处理结果不变。
-
常见实现方式:
① 数据库唯一索引去重:
-- 使用订单号做唯一索引,消费前insert去重 CREATE UNIQUE INDEX order_unique ON order_table(order_id);
② Redis去重(推荐):
String msgId = message.getMsgId(); // 消息唯一标识if (redis.setnx(\"msg:\"+msgId, \"1\") == 1) { redis.expire(\"msg:\"+msgId, 3600); // 设置过期时间1小时 processMessage(message);} else { // 消息重复,直接丢弃}
三、RabbitMQ消费端如何限流?
消费者限流主要依靠basicQos:
// 同一时间最多允许处理10条消息(未确认状态) channel.basicQos(10);
解释:
RabbitMQ不会一次性推送超过设定值的消息给消费者,未ACK的消息数量达到10时,队列暂停向该消费者发送新消息。
四、RabbitMQ消息堆积如何处理?
常见处理办法:
-
增加消费者数量或并发数;
-
消费者限流合理设置;
-
临时创建新队列和消费者分流;
-
严重时,考虑清理无用消息。
优化方案举例:
-
增加消费者(并发处理):
// 启动多个consumer实例(独立进程或线程) for (int i = 0; i < 10; i++) { new Thread(new ConsumerRunnable()).start(); }
五、如何使用RabbitMQ实现延迟消息功能?
延迟消息通过TTL(消息生存时间)+ DLX(死信队列)实现。
实现步骤(详细示例):
① 定义正常队列与死信队列
// 死信交换机和队列channel.exchangeDeclare(\"dlx_exchange\", \"direct\", true);channel.queueDeclare(\"dlx_queue\", true, false, false, null);channel.queueBind(\"dlx_queue\", \"dlx_exchange\", \"timeout\");// 正常业务队列绑定死信交换机Map args = new HashMap();args.put(\"x-dead-letter-exchange\", \"dlx_exchange\");args.put(\"x-dead-letter-routing-key\", \"timeout\");args.put(\"x-message-ttl\", 60000); // 队列TTL 60秒channel.queueDeclare(\"order_queue\", true, false, false, args);
② 发送消息到order_queue
,过期进入死信队列:
channel.basicPublish(\"\", \"order_queue\", null, message.getBytes());
③ 消费死信队列,实现延迟处理:
channel.basicConsume(\"dlx_queue\", true, (consumerTag, delivery) -> { System.out.println(\"延迟消息:\" + new String(delivery.getBody()));}, consumerTag -> {});
六、RabbitMQ的确认机制与死信队列机制怎么用?
-
确认机制前文(第二节)已详细解释。
-
死信队列适用于消息无法被消费(超时、异常):
关键场景:
-
消息TTL过期;
-
队列满了;
-
消费者拒绝消息(basicNack)。
实际使用:
-
常用于异常消息降级处理、失败重试,防止无限重试导致队列拥堵。
七、RabbitMQ集群有哪些方式?如何实现高可用?
1. 集群类型:
-
普通模式(集群元数据同步,无队列同步)
-
镜像队列模式(常用,高可用)
-
Federation模式(跨数据中心场景)
2. 镜像队列实现高可用(推荐):
通过策略实现自动镜像队列:
shell
复制编辑
rabbitmqctl set_policy ha-all \"^\" \'{\"ha-mode\":\"all\"}\'
解释:
-
^
表示所有队列; -
ha-mode:all
表示复制到所有节点。