> 技术文档 > rabbitMq内容整理

rabbitMq内容整理


一、RabbitMQ基础概念

RabbitMQ是一个开源的、实现AMQP(高级消息队列协议)的消息代理中间件,负责应用间异步消息传递、解耦、削峰、异步处理任务等。

核心概念有:

  • 生产者(Producer):发送消息的程序。

  • 消费者(Consumer):接收并处理消息的程序。

  • Broker:消息服务器,RabbitMQ服务端程序。

  • Exchange(交换机):负责根据规则分发消息到队列。

  • Queue(队列):存放消息的地方。

  • Binding(绑定):Exchange和Queue之间的关系,包含路由规则。

  • Routing Key(路由键):决定消息如何路由到队列。


二、RabbitMQ的消息模型

RabbitMQ支持以下Exchange类型:

交换机类型 工作方式 使用场景示例 direct 根据消息routing key精确匹配分发消息 精准消息发送,如支付通知、订单确认 fanout 忽略routing key广播到所有绑定的队列 广播场景,如通知系统内所有服务刷新缓存 topic 根据routing key模式匹配分发消息 模糊匹配场景,如日志收集、监控数据分发 headers 根据消息头部匹配规则 极少使用,功能类似topic,但不关注routing key

常用场景举例:

  • direct模式

    • 一个exchange,一个queue绑定routingKey为user.register,生产者发送时携带user.register路由键,消息被精确发送到该队列。

  • fanout模式

    • 一个exchange绑定多个queue,发送消息时所有队列都能接收,适合发布订阅广播场景。

  • topic模式

    • 可以使用通配符*匹配单个单词,#匹配0或多个单词,例如:

      • log.error -> 精确匹配

      • log.* 匹配log.errorlog.info

      • log.# 匹配log.error.http


三、RabbitMQ重要特性与工作中常用知识点

1、消息可靠性(必掌握)

RabbitMQ实现消息可靠传递的机制:

  • 生产者端确认(Publisher Confirm)

    • RabbitMQ确认生产者的消息已成功到达Broker。

    • 设置channel.confirmSelect()开启确认模式。

  • Broker端持久化

    • Exchange、Queue、Message都需要开启持久化(durable=True),消息会落盘。

    • 注意:消息可靠性会影响性能。

  • 消费者端ACK机制

    • 消费者明确告知消息消费完成,Broker才会删除消息。

    • 自动ACK和手动ACK(推荐)两种模式:

      • 自动ACK (autoAck=true),消费异常时可能导致消息丢失。

      • 手动ACK (channel.basicAck(deliveryTag, false)),保障数据可靠性。

2、消息幂等性(必掌握)

  • 防止重复消息消费引发的数据一致性问题。

  • 一般做法:

    • 使用消息唯一ID(UUID)去重;

    • 利用Redis或数据库的唯一键做去重校验。

3、消息死信队列(DLX)

  • 用于处理未被正常消费的消息(超过重试次数、消息过期、队列满了)。

  • DLX机制:

    • 创建单独死信交换机和队列;

    • 将业务队列绑定到死信交换机,消息异常或超时后自动进入死信队列。

4、消息延迟队列

  • 利用TTL(消息存活时间)和DLX实现延迟队列功能:

    • 设置消息或队列TTL,消息过期进入DLX。

  • 常用场景:

    • 订单超时未支付自动取消;

    • 延迟发送通知。

5、消息优先级队列

  • 设置队列为priority模式:

    Map args = new HashMap();args.put(\"x-max-priority\", 10);channel.queueDeclare(queue, durable, exclusive, autoDelete, args);
  • 消息发送时指定优先级:

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .priority(5) .build();

6、流控与消息积压问题处理(必掌握)

  • 消费端限流(QoS)

    
    

    java

    复制编辑

    channel.basicQos(10); // 一次最多处理10条未确认消息

  • 设置合理的消费者数量,防止消息堆积。

  • 监控队列长度和消费速率,及时预警处理。


四、RabbitMQ集群与高可用(常用)

  • 镜像队列(Mirror Queue)

    • 同步队列数据到多个节点,提高可用性,防止单点故障。

  • 集群模式(Cluster)

    • 主节点维护元数据,其他节点从节点复制数据。

  • Federation(联合)模式

    • 异地多数据中心间消息复制。


五、RabbitMQ性能优化(常用)

  • 使用长连接,避免频繁开关。

  • 批量发送消息,减少网络IO。

  • 调整消息持久化策略,非关键数据可关闭。

  • 合理配置消费并发数线程池


六、工作中常见问题与解决方法(重点)

问题 常见原因 解决办法 消息重复消费 消费端未正确ACK、消息超时重投 保证幂等、正确ACK 消息丢失 Broker未持久化、生产者未确认、未ACK 确认模式、开启持久化 队列消息堆积 消费速率慢、消息大量堆积 限流、增加消费者数量 连接数过多 没用连接池、频繁创建连接 连接池、长连接 消息乱序 RabbitMQ默认先进先出,但并不完全保证顺序 业务逻辑自行排序或用Kafka

七、监控管理(必掌握)

  • 使用RabbitMQ Management Plugin

    • 默认端口15672

    • 查看队列、连接、交换机、消息堆积、消费者情况;

    • 可手动删除消息、重置连接。

  • Prometheus + Grafana 监控方案:

    • 深入监控队列深度、消息消费速率、连接数、资源消耗等。
       

分界线

-------------------------------------------------------------------------------------------------------------

一、为什么选择 RabbitMQ 而不是其他中间件(如 Kafka、RocketMQ)?

面试官一般想了解你对常用MQ的理解差异,考察你技术选型能力。

功能点/场景 RabbitMQ Kafka RocketMQ 协议 AMQP 标准 自定义协议 自定义协议 消息可靠性 非常高(事务、Confirm、ACK) 较高 高 吞吐量 中(万级) 极高(百万级) 高(十万级) 延迟消息 原生TTL+DLX支持 不支持原生延迟 原生支持 消息顺序性 不严格保证 分区内严格保证 严格保证 场景偏好 高可靠事务性消息、业务解耦 日志、埋点、大数据传输 电商场景 使用难度 简单、易管理 较难(生态复杂) 较容易(偏Java)

举例场景

  • RabbitMQ适合高可靠业务通知场景,如订单支付结果通知;

  • Kafka适合日志传输和大数据处理场景;

  • RocketMQ更适合电商领域,有严格顺序消息需求。


二、如何保证RabbitMQ的消息可靠性、幂等性?

1. 消息可靠性(生产者→MQ→消费者):

生产者

  • 开启confirm确认机制

channel.confirmSelect(); // 开启确认模式channel.basicPublish(exchange, routingKey, null, message.getBytes());if (channel.waitForConfirms()) { System.out.println(\"消息投递成功\");} else { System.out.println(\"消息投递失败,需要重试或记录\");}

MQ服务器

  • 设置队列和Exchange持久化:

channel.exchangeDeclare(\"exchange\", \"direct\", true);channel.queueDeclare(\"queue\", true, false, false, null);channel.queueBind(\"queue\", \"exchange\", \"routingKey\");
  • 消息持久化发送:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2代表持久化消息 .build();channel.basicPublish(\"exchange\", \"routingKey\", props, message.getBytes());

消费者

  • 使用手动ACK机制(关键):

channel.basicConsume(\"queue\", false, (consumerTag, delivery) -> { try { // 处理消息 System.out.println(\"收到消息: \" + new String(delivery.getBody())); // 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { // 拒绝消息并重回队列 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); }}, consumerTag -> {});

2. 消息幂等性(防止消息重复消费):

幂等性就是同一消息多次处理结果不变。

  • 常见实现方式:

数据库唯一索引去重


-- 使用订单号做唯一索引,消费前insert去重 CREATE UNIQUE INDEX order_unique ON order_table(order_id);

Redis去重(推荐)

String msgId = message.getMsgId(); // 消息唯一标识if (redis.setnx(\"msg:\"+msgId, \"1\") == 1) { redis.expire(\"msg:\"+msgId, 3600); // 设置过期时间1小时 processMessage(message);} else { // 消息重复,直接丢弃}

三、RabbitMQ消费端如何限流?

消费者限流主要依靠basicQos:

// 同一时间最多允许处理10条消息(未确认状态) channel.basicQos(10);

解释
RabbitMQ不会一次性推送超过设定值的消息给消费者,未ACK的消息数量达到10时,队列暂停向该消费者发送新消息。


四、RabbitMQ消息堆积如何处理?

常见处理办法:

  • 增加消费者数量或并发数;

  • 消费者限流合理设置;

  • 临时创建新队列和消费者分流;

  • 严重时,考虑清理无用消息。

优化方案举例

  • 增加消费者(并发处理):

// 启动多个consumer实例(独立进程或线程) for (int i = 0; i < 10; i++) { new Thread(new ConsumerRunnable()).start(); }


五、如何使用RabbitMQ实现延迟消息功能?

延迟消息通过TTL(消息生存时间)+ DLX(死信队列)实现。

实现步骤(详细示例)

① 定义正常队列与死信队列

// 死信交换机和队列channel.exchangeDeclare(\"dlx_exchange\", \"direct\", true);channel.queueDeclare(\"dlx_queue\", true, false, false, null);channel.queueBind(\"dlx_queue\", \"dlx_exchange\", \"timeout\");// 正常业务队列绑定死信交换机Map args = new HashMap();args.put(\"x-dead-letter-exchange\", \"dlx_exchange\");args.put(\"x-dead-letter-routing-key\", \"timeout\");args.put(\"x-message-ttl\", 60000); // 队列TTL 60秒channel.queueDeclare(\"order_queue\", true, false, false, args);

② 发送消息到order_queue,过期进入死信队列:


channel.basicPublish(\"\", \"order_queue\", null, message.getBytes());

③ 消费死信队列,实现延迟处理:

channel.basicConsume(\"dlx_queue\", true, (consumerTag, delivery) -> { System.out.println(\"延迟消息:\" + new String(delivery.getBody()));}, consumerTag -> {});

六、RabbitMQ的确认机制与死信队列机制怎么用?

  • 确认机制前文(第二节)已详细解释。

  • 死信队列适用于消息无法被消费(超时、异常):

关键场景

  • 消息TTL过期;

  • 队列满了;

  • 消费者拒绝消息(basicNack)。

实际使用

  • 常用于异常消息降级处理、失败重试,防止无限重试导致队列拥堵。


七、RabbitMQ集群有哪些方式?如何实现高可用?

1. 集群类型:

  • 普通模式(集群元数据同步,无队列同步)

  • 镜像队列模式(常用,高可用)

  • Federation模式(跨数据中心场景)

2. 镜像队列实现高可用(推荐):

通过策略实现自动镜像队列:


shell

复制编辑

rabbitmqctl set_policy ha-all \"^\" \'{\"ha-mode\":\"all\"}\'

解释

  • ^表示所有队列;

  • ha-mode:all表示复制到所有节点。