SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)
目录
1、环境搭建
引入pom:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置yaml:
server: port: 9090 servlet: context-path: /rabbitspring: application: name: rabbit # rabbitmq配置 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin virtual-host: / # 消息开启手动确认 listener: direct: acknowledge-mode: manual
常量类配置:
配置使用过程中可能用到的常量
public class RabbitmqContext { /** * 工作队列模式 */ public static String QUEUE_WORK = "queue_work"; /** * 订阅发布模式 */ public static String QUEUE_FANOUT_ONE = "queue_fanout_one"; public static String QUEUE_FANOUT_TWO = "queue_fanout_two"; public static String EXCHANGE_FANOUT = "exchange_fanout"; /** * 路由模式 */ public static String QUEUE_DIRECT_ONE = "queue_direct_one"; public static String QUEUE_DIRECT_TWO = "queue_direct_two"; public static String EXCHANGE_DIRECT = "exchange_direct"; public static String ROUTING_DIRECT_ONE = "routing_direct_one"; public static String ROUTING_DIRECT_TWO = "routing_direct_two"; public static String ROUTING_DIRECT_THREE = "routing_direct_three"; /** * 主题模式 */ public static String QUEUE_TOPIC_ONE = "queue_topic_one"; public static String QUEUE_TOPIC_TWO = "queue_topic_two"; public static String EXCHANGE_TOPIC = "exchange_topic"; public static String ROUTING_TOPIC_ONE = "topic.one"; public static String ROUTING_TOPIC_TWO = "topic.one.two"; /** * 手动确认机制演示 */ public static String QUEUE_ACK = "queue_ack"; /** * 延时队列模式 */ public static String QUEUE_DELAY = "delay_queue"; public static String EXCHANGE_DELAY = "delay_exchange"; public static String ROUTING_DELAY = "delay";}
消息监听与配置:
整合SpringBoot后,通过RabbitListener
监听器与Config
配置实现消息的监听(消费);
消息发送:
// 引入RabbitTemplate @Resourceprivate RabbitTemplate rabbitTemplate;// 发送消息rabbitTemplate.convertAndSend(String routingKey, Object object);rabbitTemplate.convertSendAndReceive(Object object);
发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:
- convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
- convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;
2、队列模式
Controller:
@RestControllerpublic class RabbitmqController { @Resource private RabbitTemplate rabbitTemplate; @PostMapping("/sendWork") public Object sendWordQueue() { String msg = "工作模式消息,ID:"; // 绑定队列发送消息 for (int i = 1; i <= 10; i++) { String sendMsg = msg + i; rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg); } return "发送成功..."; }}
Config:
@Configurationpublic class WorkConfig { /** * 配置队列名称 - 工作模式 * * @return */ @Bean public Queue queueWork() { return new Queue(RabbitmqContext.QUEUE_WORK); }}
Listener:
@Component@Slf4jpublic class WorkReceiveListener { /** * @param msg 接收的文本消息 * @param channel 通道信息 * @param message 附加的参数信息 */ @RabbitListener(queues = "queue_work") public void receiveQueueWork1(String msg, Channel channel, Message message) { log.info("消费者01-接收到消息:" + msg); } @RabbitListener(queues = "queue_work") public void receiveQueueWork2(String msg, Channel channel, Message message) { log.info("消费者02-接收到消息:" + msg); }}
结果:
3、发布订阅模式
Controller:
@RestControllerpublic class RabbitmqController { @Resource private RabbitTemplate rabbitTemplate; @PostMapping("/sendFanout") public String sendFanout() { String msg = "发布|订阅模式消息,ID:"; for (int i = 1; i <= 10; i++) { String sendMsg = msg + i; // 绑定交换机发送消息,路由key为 "" rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg); } return "发送成功..."; }}
Config:
@Configurationpublic class FanoutConfig { /** * 配置队列名称 - 发布订阅模式 * * @return */ @Bean public Queue queueFanoutOne() { return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE); } @Bean public Queue queueFanoutTwo() { return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO); } /** * 定义FanoutExchange类型的交换机 * * @return */ @Bean public FanoutExchange exchangeFanout() { return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT); } /** * 将交换机和队列进行绑定 * * @return */ @Bean public Binding bindingExchangeOne() { return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout()); } @Bean public Binding bindingExchangeTwo() { return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout()); }}
Listener:
@Component@Slf4jpublic class FanoutReceiveListener { /** * @param msg 接收的文本消息 * @param channel 通道信息 * @param message 附加的参数信息 */ @RabbitListener(queues = "queue_fanout_one") public void consumerOne1(String msg, Channel channel, Message message) { System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg); } @RabbitListener(queues = "queue_fanout_one") public void consumerOne2(String msg, Channel channel, Message message) { System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg); } /** * -------------一个队列绑定两个消费者 -------------------------------- */ @RabbitListener(queues = "queue_fanout_two") public void consumerTwo1(String msg, Channel channel, Message message) { System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg); } @RabbitListener(queues = "queue_fanout_two") public void consumerTwo2(String msg, Channel channel, Message message) { System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg); }}
结果:
4、路由模式
路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:
@RestControllerpublic class RabbitmqController { @Resource private RabbitTemplate rabbitTemplate; @PostMapping("/sendDirect") public String sendDirect() { String msg = "路由模式消息,ID:"; for (int i = 1; i <= 12; i++) { String sendMsg = null; String routingKey = ""; if (i % 2 == 0) { sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO; routingKey = RabbitmqContext.ROUTING_DIRECT_TWO; } else if (i % 3 == 0) { sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE; routingKey = RabbitmqContext.ROUTING_DIRECT_THREE; } else { sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE; routingKey = RabbitmqContext.ROUTING_DIRECT_ONE; } // 绑定交换机,并且设置 路由Key 发送消息 rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg); } return "发送成功..."; }}
Config:
@Configurationpublic class DirectConfig { /** * 队列一 * * @return */ @Bean public Queue directQueueOne() { return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE); } /** * 队列二 * * @return */ @Bean public Queue directQueueTwo() { return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO); } /** * 定义交换机 direct类型 * * @return */ @Bean public DirectExchange myDirectExchange() { return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT); } /** * 队列 绑定到交换机 再指定一个路由键 * directQueueOne() 会找到上方定义的队列bean * * @return */ @Bean public Binding directExchangeOne() { return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE); } @Bean public Binding directExchangeTwo() { return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO); } /** * 队列 绑定到交换机 再指定一个路由键 * * @return */ @Bean public Binding directExchangeThree() { return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE); }}
Listener:
@Component@Slf4jpublic class DirectReceiveListener { /** * @param msg 接收的文本消息 * @param channel 通道信息 * @param message 附加的参数信息 */ @RabbitListener(queues = "queue_direct_one") public void consumerOne1(String msg, Channel channel, Message message) { System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg); } @RabbitListener(queues = "queue_direct_one") public void consumerTwo(String msg, Channel channel, Message message) { System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg); } @RabbitListener(queues = "queue_direct_two") public void consumerOne(String msg, Channel channel, Message message) { System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg); } @RabbitListener(queues = "queue_direct_two") public void consumerDirect(String msg, Channel channel, Message message) { System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg); }}
结果:
5、主题模式
与路由模式的区别在于:可以通过不同规则匹配多个路由;
Controller:
@RestControllerpublic class RabbitmqController { @Resource private RabbitTemplate rabbitTemplate; @PostMapping("/sendTopic") public String sendTopic() { String msg = "消息ID:"; for (int i = 1; i <= 10; i++) { String sendMsg = null; String routingKey = ""; if (i % 2 == 0) { sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO; routingKey = RabbitmqContext.ROUTING_TOPIC_TWO; } else { sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE; routingKey = RabbitmqContext.ROUTING_TOPIC_ONE; } // 绑定交换机,并且设置 路由Key 发送消息 rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg); } return "发送成功..."; }}
Config:
@Configurationpublic class TopicConfig { /** * 队列定义 * * @return */ @Bean public Queue topicQueueOne() { return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE); } /** * 队列定义 * * @return */ @Bean public Queue topicQueueTwo() { return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO); } /** * 定义 TopicExchange 类型交换机 * * @return */ @Bean public TopicExchange exchangeTopic() { return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC); } /** * 队列一绑定到交换机 且设置路由键为 topic.# * * @return */ @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#"); } /** * 队列二绑定到交换机 且设置路由键为 topic.* * * @return */ @Bean public Binding bindingTopic2() { return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*"); }}
Listener:
@Component@Slf4jpublic class TopicReceiveListener { /** * @param msg 接收的文本消息 * @param channel 通道信息 * @param message 附加的参数信息 */ @RabbitListener(queues = "queue_topic_one") public void listenOne1(String msg, Channel channel, Message message) { System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg); } @RabbitListener(queues = "queue_topic_one") public void listenOne2(String msg, Channel channel, Message message) { System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg); } @RabbitListener(queues = "queue_topic_two") public void listenTwo1(String msg, Channel channel, Message message) { System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg); } @RabbitListener(queues = "queue_topic_two") public void listenTwo2(String msg, Channel channel, Message message) { System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg); }}
结果:
6、消息手动应答机制
说明:
通过配置自定义的SimpleRabbitListenerContainerFactory
,并且在监听器中通过@RabbitListener
注解通过containerFactory
属性设置的SimpleRabbitListenerContainerFactory
,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)
yml配置:
在原有的yaml
基础上追加,以下配置:
spring: rabbitmq: # 确认消息已发送到队列 return publisher-returns: true # 开启消息确认机制 confirm 异步 publisher-confirm-type: correlated listener: direct: # 消息开启手动确认 acknowledge-mode: manual # 拒绝消息是否重回队列 default-requeue-rejected: true
配置类:
@Configuration@Slf4jpublic class RabbitConfig {@Bean("listenerContainerFactory") public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 如果需要批量确认消息,则做以下设置 // 设置批量 // factory.setBatchListener(true); // 设置BatchMessageListener生效 // factory.setConsumerBatchEnabled(true); // 设置批量确认数量 // factory.setBatchSize(10); //设置消息为手动确认 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; }}
Controller:
@RestControllerpublic class RabbitmqController { @Resource private RabbitTemplate rabbitTemplate; @PostMapping("/sendAck") public String sendAck() { Object msg = "这是手动确认机制的消息"; rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg); return "发送成功..."; }}
Config:
@Configurationpublic class AckConfig { /** * 配置队列名称 - 工作模式 * * @return */ @Bean public Queue queueConfirm() { return new Queue(RabbitmqContext.QUEUE_ACK); }}
Listener:
@Component@Slf4jpublic class AckReceiveListener { /** * @param msg 接收的文本消息 * @param channel 通道信息 * @param message 附加的参数信息 */ @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory") public void queueAck(String msg, Channel channel, Message message) { try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg); } catch (Exception e) { e.printStackTrace(); } } /** * 批量手动确认机制 * * @param messages * @param channel */// @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")// public void queueAck(List messages, Channel channel) {// try {// for (Message message : messages) {// long deliveryTag = message.getMessageProperties().getDeliveryTag();// String msg = new String(message.getBody(), StandardCharsets.UTF_8);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);// channel.basicAck(deliveryTag, false);// }// } catch (Exception e) {// e.printStackTrace();// }// }}
7、回调函数-确认机制(发布确认模式)
通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;
yaml:
在原有yaml
的基础上,追加以下配置:
spring: rabbitmq: # 开启消息回退 return publisher-returns: true # 开启消息确认机制 confirm 异步 publisher-confirm-type: correlated
配置类:
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @Author: LiHuaZhi * @Date: 2021/10/24 23:06 * @Description: **/@Configuration@Slf4jpublic class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); /* * 触发条件: * 1、消息推送到server,但是在server里找不到交换机 * 2、消息推送到server,找到交换机了,但是没找到队列 * 3、消息推送到sever,交换机和队列啥都没找到 * 4、消息推送成功 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("ConfirmCallback-------消息成功推送到MQ"); } else { log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause); } }); /* * 触发条件: * 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回; */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", new String(message.getBody()), exchange, replyText, routingKey); }); return rabbitTemplate; }}
使用:
当消息发送后,会自动回调RabbitConfig
配置类中的rabbitTemplate.setConfirmCallback
方法;