RabbitMQ结合SpringBoot使用干货
Springboot配置
@Configurationpublic class ApiRetryQueueConfig { // 普通交换机的名称 public static final String API_RETRY_EXCHANGE = \"API_RETRY_EXCHANGE\"; // 死信交换机的名称 public static final String DEAD_API_RETRY_EXCHANGE = \"DEAD_API_RETRY_EXCHANGE\"; // 一分钟普通队列的名称 public static final String API_RETRY_ONE_MINUTE_QUEUE = \"API_RETRY_ONE_MINUTE_QUEUE\"; // 死信队列的名称 public static final String DEAD_API_RETRY_QUEUE = \"DEAD_API_RETRY_QUEUE\"; // 一分钟普通队列路由key public static final String ONE_MINUTE_API_RETRY = \"ONE_MINUTE_API_RETRY\"; // 死信路由key public static final String DEAD_LETTER_ROUTING_KEY = \"API_RETRY\"; // 死信配置参数 public static final String DEAD_EXCHANGE = \"x-dead-letter-exchange\"; public static final String DEAD_ROUTING_KEY = \"x-dead-letter-routing-key\"; public static final String MESSAGE_TTL = \"x-message-ttl\"; //声明普通交换机 @Bean(\"apiRetryExchange\") public DirectExchange apiRetryExchange() { return new DirectExchange(API_RETRY_EXCHANGE); } //声明死信交换机 @Bean(\"deadApiRetryExchange\") public DirectExchange deadApiRetryExchange() { return new DirectExchange(DEAD_API_RETRY_EXCHANGE); } // 普通队列 @Bean(\"apiRetryOneMinuteQueue\") public Queue apiRetryOneMinuteQueue() { Map arguments = new HashMap(3); // 设置死信交换机 arguments.put(DEAD_EXCHANGE, DEAD_API_RETRY_EXCHANGE); // 设置死信RoutingKey arguments.put(DEAD_ROUTING_KEY, DEAD_LETTER_ROUTING_KEY); // 设置TTL 1分钟 arguments.put(MESSAGE_TTL, 60 * 1000); return QueueBuilder.durable(API_RETRY_ONE_MINUTE_QUEUE).withArguments(arguments).build(); } // 声明死信队列 @Bean(\"deadApiRetryQueue\") public Queue deadApiRetryQueue() { return QueueBuilder.durable(DEAD_API_RETRY_QUEUE).build(); } // 一分钟队列绑定普通交换机 @Bean public Binding apiRetryOneMinuteQueueBindingApiRetryExchange(@Qualifier(\"apiRetryOneMinuteQueue\") Queue apiRetryOneMinuteQueue, @Qualifier(\"apiRetryExchange\") DirectExchange apiRetryExchange) { return BindingBuilder.bind(apiRetryOneMinuteQueue).to(apiRetryExchange).with(ONE_MINUTE_API_RETRY); } // 死信交换机绑定死信队列 @Bean public Binding deadApiRetryQueueDeadApiRetryExchange(@Qualifier(\"deadApiRetryQueue\") Queue deadApiRetryQueue, @Qualifier(\"deadApiRetryExchange\") DirectExchange deadApiRetryExchange) { return BindingBuilder.bind(deadApiRetryQueue).to(deadApiRetryExchange).with(DEAD_LETTER_ROUTING_KEY); }}
@Configuration@Slf4jpublic class RabbitConfig { @Resource private CachingConnectionFactory connectionFactory; @Resource private BroadCastErrorInfoService broadCastErrorInfoService; @Value(\"${rabbitmq.publishe-to-exchange.retry.count}\") private Integer retryCount; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); // 使用事务模式,不能使用publish-confirm // 消息是否成功发送到Exchange rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // 消息成功发送 log.info(\"消息发发送成功:{}\", correlationData); } else { // 消息发送失败 log.error(\"消息发送到Exchange失败, {}, cause: {}\", correlationData, cause); // 进行重发 if (correlationData instanceof MQCorrelationData) { MQCorrelationData mqCorrelationData = (MQCorrelationData) correlationData; Object message = mqCorrelationData.getMessage(); String exchange = mqCorrelationData.getExchange(); String routingKey = mqCorrelationData.getRoutingKey(); int retryNum = mqCorrelationData.getRetryCount(); if (retryNum = retryCount) { log.error(\"丢进死信队列:{}\", message); BroadCastErrorInfo broadCastErrorInfo = new BroadCastErrorInfo(); broadCastErrorInfo.setData(message.toString()); broadCastErrorInfo.setExchange(exchange); broadCastErrorInfo.setRoutingKey(routingKey); broadCastErrorInfo.setReason(cause); broadCastErrorInfoService.save(broadCastErrorInfo); } } } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从消费失败 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { BroadCastErrorInfo broadCastErrorInfo = new BroadCastErrorInfo(); broadCastErrorInfo.setData(new String(message.getBody())); broadCastErrorInfo.setExchange(exchange); broadCastErrorInfo.setRoutingKey(routingKey); broadCastErrorInfo.setReason(replyText); broadCastErrorInfoService.save(broadCastErrorInfo); log.error(\"消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}\", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter converter() { ObjectMapper mapper = new ObjectMapper().findAndRegisterModules(); return new Jackson2JsonMessageConverter(mapper); } @Bean public MessageConverter messageConverter() { ObjectMapper mapper = new ObjectMapper().findAndRegisterModules(); return new Jackson2JsonMessageConverter(mapper); }}
Nacos配置
spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: / username: admin password: admin # 开启消息确认模式 # 消息发送到交换机确认机制,发布消息成功到交换器后会触发回调方法 publisher-confirm-type: correlated # 是否返回生产者 publisher-returns: true template: #开启mandatory: true, basic.return方法将消息返还给生产者 mandatory: true listener: simple: # 手动应答 acknowledge-mode: manual # 最少消费者数量 concurrency: 1 # 最多消费者数量 max-concurrency: 10 # 支持重试 retry: enabled: true
确保消息处理不丢失
1、持久化
(1)队列持久化
@Bean(\"Queue\")public Queue queue() { return QueueBuilder.durable(\"queue\").build(); }
(2)消息持久化
Message msg = MessageBuilder.withBody(\"消息内容\".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化 .build();// 发送MQ消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
2、配置 publisher-confirm-type:correlated 以及实现 RabbitTemplate.ConfirmCallback 确保了生产者消息发送到了Exchange交换机
只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法
@Slf4j@Componentpublic class MyCallBack implements RabbitTemplate.ConfirmCallback { //注入 @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //注入 rabbitTemplate.setConfirmCallback(this); } /** * 交换机不管是否收到消息的一个回调方法 * 1. 发消息 交换机接收到了 回调 * @param correlationData 保存回调信息的Id及相关信息 * @param ack 交换机收到消息 为true * @param cause 未收到消息的原因 * */ @Override public void confirm(CorrelationData correlationData, boolean ack,String cause) { String id = correlationData!=null?correlationData.getId():\"\"; if(ack){ log.info(\"交换机已经收到了ID为:{}的消息\",id); }else { // todo 可以进行重试操作,也可以保存在数据库人工处理 log.info(\"交换机还未收到ID为:{}的消息,由于原因:{}\",id,cause); } }}
3、配置 publisher-returns:true 和 mandatory: true 以及实现 RabbitTemplate.ReturnsCallback 确保了 Exchange交换机路由到了Queue队列
获取回退的消息,需要自定义类实现 RabbitTemplate.ReturnsCallback 接口,并且初始化时,使用该自定义类作为回退消息的处理类,同时开启 Mandatory,设置为 true
在启动开启 Mandatory,或者在代码里手动开启 Mandatory 参数,或者都开启。会将消息返回给生产者。
@Slf4j@Componentpublic class MyCallBack implements RabbitTemplate.ReturnsCallback { //注入 @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //注入 rabbitTemplate.setReturnsCallback(this); } //可以在当消息传递过程中不可达目的地时将消息返回给生产者 //只有不可达目的地的时候 才进行回退 /** * 当消息无法路由的时候的回调方法 * message 消息 * replyCode 编码 * replyText 退回原因 * exchange 从哪个交换机退回 * routingKey 通过哪个路由 key 退回 */ @Override public void returnedMessage(ReturnedMessage returned) { // todo 可以进行重试操作,也可以保存在数据库人工处理 log.error(\"消息{},被交换机{}退回,退回原因:{},路由key:{}\", new String(returned.getMessage().getBody()),returned.getExchange(), returned.getReplyText(),returned.getRoutingKey()); }}
4、配置 acknowledge-mode: manual 以及 在消费者增加 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 手动处理消费者消费的消息是否成功
@RabbitListener(queues = CustomizedScheduleEmailBroadcast.CUSTOMIZED_SCHEDULE_QUEUE)public void sendEmail(Message message, Channel channel) throws IOException { try { String body = new String(message.getBody()); log.info(\"消息体:{}\", JSON.toJSONString(body)); // 消费成功,返回给消息队列ack,第一个参数:消息标识,第二个参数:是否批量处理 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error(\"消费失败\"); 保存到消费失败的数据库,人工处理... // 第一个参数:消息标识 // 第二个参数:是否重新入队 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }}