> 文档中心 > SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

目录

  • 1、环境搭建
  • 2、队列模式
  • 3、发布订阅模式
  • 4、路由模式
  • 5、主题模式
  • 6、消息手动应答机制
  • 7、回调函数-确认机制(发布确认模式)

1、环境搭建

引入pom:

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置yaml:

server:  port: 9090  servlet:    context-path: /rabbitspring:  application:    name: rabbit  # rabbitmq配置  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    password: admin    virtual-host: /    # 消息开启手动确认    listener:      direct: acknowledge-mode: manual

常量类配置:
配置使用过程中可能用到的常量

public class RabbitmqContext {    /**     * 工作队列模式     */    public static String QUEUE_WORK = "queue_work";    /**     * 订阅发布模式     */    public static String QUEUE_FANOUT_ONE = "queue_fanout_one";    public static String QUEUE_FANOUT_TWO = "queue_fanout_two";    public static String EXCHANGE_FANOUT = "exchange_fanout";    /**     * 路由模式     */    public static String QUEUE_DIRECT_ONE = "queue_direct_one";    public static String QUEUE_DIRECT_TWO = "queue_direct_two";    public static String EXCHANGE_DIRECT = "exchange_direct";    public static String ROUTING_DIRECT_ONE = "routing_direct_one";    public static String ROUTING_DIRECT_TWO = "routing_direct_two";    public static String ROUTING_DIRECT_THREE = "routing_direct_three";    /**     * 主题模式     */    public static String QUEUE_TOPIC_ONE = "queue_topic_one";    public static String QUEUE_TOPIC_TWO = "queue_topic_two";    public static String EXCHANGE_TOPIC = "exchange_topic";    public static String ROUTING_TOPIC_ONE = "topic.one";    public static String ROUTING_TOPIC_TWO = "topic.one.two";    /**     * 手动确认机制演示     */    public static String QUEUE_ACK = "queue_ack";    /**     * 延时队列模式     */    public static String QUEUE_DELAY = "delay_queue";    public static String EXCHANGE_DELAY = "delay_exchange";    public static String ROUTING_DELAY = "delay";}

消息监听与配置:
整合SpringBoot后,通过RabbitListener监听器与Config配置实现消息的监听(消费);

消息发送:

// 引入RabbitTemplate @Resourceprivate RabbitTemplate rabbitTemplate;// 发送消息rabbitTemplate.convertAndSend(String routingKey, Object object);rabbitTemplate.convertSendAndReceive(Object object);

发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:

  • convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
  • convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;

2、队列模式

Controller:

@RestControllerpublic class RabbitmqController {      @Resource    private RabbitTemplate rabbitTemplate;    @PostMapping("/sendWork")    public Object sendWordQueue() { String msg = "工作模式消息,ID:"; // 绑定队列发送消息 for (int i = 1; i <= 10; i++) {     String sendMsg = msg + i;     rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg); } return "发送成功...";    }}

Config:

@Configurationpublic class WorkConfig {    /**     * 配置队列名称 - 工作模式     *     * @return     */    @Bean    public Queue queueWork() { return new Queue(RabbitmqContext.QUEUE_WORK);    }}

Listener:

@Component@Slf4jpublic class WorkReceiveListener {    /**     * @param msg     接收的文本消息     * @param channel 通道信息     * @param message 附加的参数信息     */    @RabbitListener(queues = "queue_work")    public void receiveQueueWork1(String msg, Channel channel, Message message) { log.info("消费者01-接收到消息:" + msg);    }    @RabbitListener(queues = "queue_work")    public void receiveQueueWork2(String msg, Channel channel, Message message) { log.info("消费者02-接收到消息:" + msg);    }}

结果:
在这里插入图片描述

3、发布订阅模式

Controller:

@RestControllerpublic class RabbitmqController {      @Resource    private RabbitTemplate rabbitTemplate;    @PostMapping("/sendFanout")    public String sendFanout() { String msg = "发布|订阅模式消息,ID:"; for (int i = 1; i <= 10; i++) {     String sendMsg = msg + i;     // 绑定交换机发送消息,路由key为 ""     rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg); } return "发送成功...";    }}    

Config:

@Configurationpublic class FanoutConfig {    /**     * 配置队列名称 - 发布订阅模式     *     * @return     */    @Bean    public Queue queueFanoutOne() { return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE);    }    @Bean    public Queue queueFanoutTwo() { return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO);    }    /**     * 定义FanoutExchange类型的交换机     *     * @return     */    @Bean    public FanoutExchange exchangeFanout() { return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT);    }    /**     * 将交换机和队列进行绑定     *     * @return     */    @Bean    public Binding bindingExchangeOne() { return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout());    }    @Bean    public Binding bindingExchangeTwo() { return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout());    }}

Listener:

@Component@Slf4jpublic class FanoutReceiveListener {    /**     * @param msg     接收的文本消息     * @param channel 通道信息     * @param message 附加的参数信息     */    @RabbitListener(queues = "queue_fanout_one")    public void consumerOne1(String msg, Channel channel, Message message) { System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg);    }    @RabbitListener(queues = "queue_fanout_one")    public void consumerOne2(String msg, Channel channel, Message message) { System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg);    }    /**     * -------------一个队列绑定两个消费者 --------------------------------     */    @RabbitListener(queues = "queue_fanout_two")    public void consumerTwo1(String msg, Channel channel, Message message) { System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg);    }    @RabbitListener(queues = "queue_fanout_two")    public void consumerTwo2(String msg, Channel channel, Message message) { System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg);    }}

结果:
在这里插入图片描述

4、路由模式

路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:

@RestControllerpublic class RabbitmqController {      @Resource    private RabbitTemplate rabbitTemplate;    @PostMapping("/sendDirect")    public String sendDirect() { String msg = "路由模式消息,ID:"; for (int i = 1; i <= 12; i++) {     String sendMsg = null;     String routingKey = "";     if (i % 2 == 0) {  sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO;  routingKey = RabbitmqContext.ROUTING_DIRECT_TWO;     } else if (i % 3 == 0) {  sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE;  routingKey = RabbitmqContext.ROUTING_DIRECT_THREE;     } else {  sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE;  routingKey = RabbitmqContext.ROUTING_DIRECT_ONE;     }     // 绑定交换机,并且设置 路由Key 发送消息     rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg); } return "发送成功...";    }}

Config:

@Configurationpublic class DirectConfig {    /**     * 队列一     *     * @return     */    @Bean    public Queue directQueueOne() { return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE);    }    /**     * 队列二     *     * @return     */    @Bean    public Queue directQueueTwo() { return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO);    }    /**     * 定义交换机 direct类型     *     * @return     */    @Bean    public DirectExchange myDirectExchange() { return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT);    }    /**     * 队列 绑定到交换机 再指定一个路由键     * directQueueOne() 会找到上方定义的队列bean     *     * @return     */    @Bean    public Binding directExchangeOne() { return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE);    }    @Bean    public Binding directExchangeTwo() { return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO);    }    /**     * 队列 绑定到交换机 再指定一个路由键     *     * @return     */    @Bean    public Binding directExchangeThree() { return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE);    }}

Listener:

@Component@Slf4jpublic class DirectReceiveListener {    /**     * @param msg     接收的文本消息     * @param channel 通道信息     * @param message 附加的参数信息     */    @RabbitListener(queues = "queue_direct_one")    public void consumerOne1(String msg, Channel channel, Message message) { System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg);    }    @RabbitListener(queues = "queue_direct_one")    public void consumerTwo(String msg, Channel channel, Message message) { System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg);    }    @RabbitListener(queues = "queue_direct_two")    public void consumerOne(String msg, Channel channel, Message message) { System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg);    }    @RabbitListener(queues = "queue_direct_two")    public void consumerDirect(String msg, Channel channel, Message message) { System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg);    }}

结果:
在这里插入图片描述

5、主题模式

与路由模式的区别在于:可以通过不同规则匹配多个路由;

Controller:

@RestControllerpublic class RabbitmqController {      @Resource    private RabbitTemplate rabbitTemplate;    @PostMapping("/sendTopic")    public String sendTopic() { String msg = "消息ID:"; for (int i = 1; i <= 10; i++) {     String sendMsg = null;     String routingKey = "";     if (i % 2 == 0) {  sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO;  routingKey = RabbitmqContext.ROUTING_TOPIC_TWO;     } else {  sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE;  routingKey = RabbitmqContext.ROUTING_TOPIC_ONE;     }     // 绑定交换机,并且设置 路由Key 发送消息     rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg); } return "发送成功...";    }}

Config:

@Configurationpublic class TopicConfig {    /**     * 队列定义     *     * @return     */    @Bean    public Queue topicQueueOne() { return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE);    }    /**     * 队列定义     *     * @return     */    @Bean    public Queue topicQueueTwo() { return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO);    }    /**     * 定义 TopicExchange 类型交换机     *     * @return     */    @Bean    public TopicExchange exchangeTopic() { return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC);    }    /**     * 队列一绑定到交换机 且设置路由键为 topic.#     *     * @return     */    @Bean    public Binding bindingTopic1() { return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");    }    /**     * 队列二绑定到交换机 且设置路由键为 topic.*     *     * @return     */    @Bean    public Binding bindingTopic2() { return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*");    }}

Listener:

@Component@Slf4jpublic class TopicReceiveListener {    /**     * @param msg     接收的文本消息     * @param channel 通道信息     * @param message 附加的参数信息     */    @RabbitListener(queues = "queue_topic_one")    public void listenOne1(String msg, Channel channel, Message message) { System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg);    }    @RabbitListener(queues = "queue_topic_one")    public void listenOne2(String msg, Channel channel, Message message) { System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg);    }    @RabbitListener(queues = "queue_topic_two")    public void listenTwo1(String msg, Channel channel, Message message) { System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg);    }    @RabbitListener(queues = "queue_topic_two")    public void listenTwo2(String msg, Channel channel, Message message) { System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg);    }}

结果:
在这里插入图片描述

6、消息手动应答机制

说明:
通过配置自定义的SimpleRabbitListenerContainerFactory,并且在监听器中通过@RabbitListener注解通过containerFactory属性设置的SimpleRabbitListenerContainerFactory,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)

yml配置:
在原有的yaml基础上追加,以下配置:

spring:  rabbitmq:    # 确认消息已发送到队列 return    publisher-returns: true    # 开启消息确认机制 confirm 异步    publisher-confirm-type: correlated    listener:      direct: # 消息开启手动确认 acknowledge-mode: manual # 拒绝消息是否重回队列 default-requeue-rejected: true

配置类:

@Configuration@Slf4jpublic class RabbitConfig {@Bean("listenerContainerFactory")    public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 如果需要批量确认消息,则做以下设置 // 设置批量 // factory.setBatchListener(true); // 设置BatchMessageListener生效 // factory.setConsumerBatchEnabled(true); // 设置批量确认数量 // factory.setBatchSize(10); //设置消息为手动确认 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory;    }}

Controller:

@RestControllerpublic class RabbitmqController {      @Resource    private RabbitTemplate rabbitTemplate;    @PostMapping("/sendAck")    public String sendAck() { Object msg = "这是手动确认机制的消息"; rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg); return "发送成功...";    }}

Config:

@Configurationpublic class AckConfig {    /**     * 配置队列名称 - 工作模式     *     * @return     */    @Bean    public Queue queueConfirm() { return new Queue(RabbitmqContext.QUEUE_ACK);    }}

Listener:

@Component@Slf4jpublic class AckReceiveListener {    /**     * @param msg     接收的文本消息     * @param channel 通道信息     * @param message 附加的参数信息     */    @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")    public void queueAck(String msg, Channel channel, Message message) { try {     long deliveryTag = message.getMessageProperties().getDeliveryTag();     channel.basicAck(deliveryTag, false);     System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg); } catch (Exception e) {     e.printStackTrace(); }    }    /**     * 批量手动确认机制     *     * @param messages     * @param channel     *///    @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")//    public void queueAck(List messages, Channel channel) {// try {//     for (Message message : messages) {//  long deliveryTag = message.getMessageProperties().getDeliveryTag();//  String msg = new String(message.getBody(), StandardCharsets.UTF_8);//  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//  System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);//  channel.basicAck(deliveryTag, false);//     }// } catch (Exception e) {//     e.printStackTrace();// }//    }}

7、回调函数-确认机制(发布确认模式)

通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;

yaml:
在原有yaml的基础上,追加以下配置:

spring:    rabbitmq:    # 开启消息回退 return    publisher-returns: true    # 开启消息确认机制 confirm 异步    publisher-confirm-type: correlated

配置类:

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @Author: LiHuaZhi * @Date: 2021/10/24 23:06 * @Description: **/@Configuration@Slf4jpublic class RabbitConfig {    @Bean    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); /*  * 触发条件:  * 1、消息推送到server,但是在server里找不到交换机  * 2、消息推送到server,找到交换机了,但是没找到队列  * 3、消息推送到sever,交换机和队列啥都没找到  * 4、消息推送成功  */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {     if (ack) {  log.info("ConfirmCallback-------消息成功推送到MQ");     } else {  log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause);     } }); /*  * 触发条件:  * 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回;  */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {     log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", new      String(message.getBody()), exchange, replyText, routingKey); }); return rabbitTemplate;    }}

使用:
当消息发送后,会自动回调RabbitConfig配置类中的rabbitTemplate.setConfirmCallback方法;