springBoot集成RabbitMq
1 增加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2 写rabbitmq配置类
根据不同的模式 写法略有差异
不过总的来说 都需要 交换机(有的模式默认交换机) 队列 和 绑定
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig { public static final String SECKILL_QUEUE = "seckill_queue"; public static final String QUEUE = "queue"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String HEADER_QUEUE = "header.queue"; public static final String TOPLIC_EXCHANGE = "topicExchange"; public static final String FANOUT_EXCHANGE = "fanoutExchange"; public static final String HEADERS_EXCHANGE = "headersExchange"; /** * Direct 模式 交换机exchange(默认交换机) * @return 消息队列 durable 代表是否持久化 */ @Bean public Queue queue() { return new Queue(QUEUE,true) ;} /** * */ @Bean public Queue seckillQueue() { return new Queue(SECKILL_QUEUE,true);} /** * topic模式 交换机exchange * @return */ @Bean public Queue toplicQueue1() { return new Queue(TOPIC_QUEUE1,true);} @Bean public Queue toplicQueue2() { return new Queue(TOPIC_QUEUE2,true);} /** * 创建一个交换机 */ @Bean public TopicExchange toplicExchange() { return new TopicExchange(TOPLIC_EXCHANGE);} @Bean public Binding topicBing1(){ return BindingBuilder.bind(toplicQueue1()).to(toplicExchange()).with("topic.key1"); } @Bean public Binding topicBing2(){ return BindingBuilder.bind(toplicQueue2()).to(toplicExchange()).with("topic.#"); } /** * fanout模式 交换机Exchange */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } /** * fanout 绑定exchange * @return */ @Bean public Binding FanoutBing1() { return BindingBuilder.bind(toplicQueue1()).to(fanoutExchange());} @Bean public Binding FanoutBing2() { return BindingBuilder.bind(toplicQueue2()).to(fanoutExchange());} /** * Header模式 * @return */ @Bean public HeadersExchange headersExchange(){ return new HeadersExchange(HEADERS_EXCHANGE);} @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE,true); } @Bean public Binding headerBing(){ Map<String,Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchange()).whereAll(map).match(); }}
3 创建生产者
使用AmqpTemplate减少手动实现创建连接和通道的过程 只需要关心以何种模式发送消息到何处
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class Sender { private static Logger logger = LoggerFactory.getLogger(Sender.class); @Autowired AmqpTemplate amqpTemplate; public void send(String message){ logger.info("MQ SEDN MESSAGE:" + message); // 发送的队列 , 发送的消息 amqpTemplate.convertAndSend(MqConfig.QUEUE,message); } //发送到topic 的交换机 public void sendTopic(String msg){ logger.info("MQ SEND MESSAGE:" + msg); amqpTemplate.convertAndSend(MqConfig.TOPLIC_EXCHANGE,"topic.key1",msg); amqpTemplate.convertAndSend(MqConfig.TOPLIC_EXCHANGE,"topic.key2",msg); } //发送到fanout 的交换机 public void sentFanout(String msg){ logger.info("MQ SEND MESSAGE:" + msg); amqpTemplate.convertAndSend(MqConfig.FANOUT_EXCHANGE,"",msg); } //发送到header 交换机 public void sendHeader(String msg){ logger.info("MQ SEND MESSAGE:" + msg); //使用Message对msg进行修饰 并封装成一个Message对象进行传输 MessageProperties properties = new MessageProperties(); properties.setHeader("header1","value1"); properties.setAppId("2022"); Message obj = new Message(msg.getBytes(),properties); amqpTemplate.convertAndSend(MqConfig.HEADERS_EXCHANGE,"",obj); }}
4创建消费者
import edu.uestc.rabbitmq.MQConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class Receiver { @Autowired AmqpTemplate amqpTemplate; private static Logger logger = LoggerFactory.getLogger(Receiver.class); @RabbitListener(queues = MQConfig.QUEUE) public void receive(String message) { logger.info("MQ: message: " + message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE1) public void receiveTopic1(String message) { logger.info("topic queue1 message: " + message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE2) public void receiveTopic2(String message) { logger.info("topic queue2 message: " + message); } @RabbitListener(queues = MQConfig.HEADER_QUEUE) public void receiveHeaderQueue(byte[] message) { logger.info("header queue message: " + new String(message)); }}