> 文档中心 > springBoot集成RabbitMq

springBoot集成RabbitMq

1 增加依赖

 <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2 写rabbitmq配置类
根据不同的模式 写法略有差异
不过总的来说 都需要 交换机(有的模式默认交换机) 队列 和 绑定

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig {    public static final String SECKILL_QUEUE = "seckill_queue";    public static final String QUEUE = "queue";    public static final String TOPIC_QUEUE1 = "topic.queue1";    public static final String TOPIC_QUEUE2 = "topic.queue2";    public static final String HEADER_QUEUE = "header.queue";    public static final String TOPLIC_EXCHANGE = "topicExchange";    public static final String FANOUT_EXCHANGE = "fanoutExchange";    public static final String HEADERS_EXCHANGE = "headersExchange";    /**     * Direct 模式 交换机exchange(默认交换机)     * @return 消息队列 durable 代表是否持久化     */    @Bean    public Queue queue() { return new Queue(QUEUE,true) ;}    /**     *     */    @Bean    public Queue seckillQueue() { return new Queue(SECKILL_QUEUE,true);}    /**     * topic模式 交换机exchange     * @return     */    @Bean    public Queue toplicQueue1() { return new Queue(TOPIC_QUEUE1,true);}    @Bean    public Queue toplicQueue2() { return new Queue(TOPIC_QUEUE2,true);}    /**     * 创建一个交换机     */    @Bean    public TopicExchange toplicExchange() { return new TopicExchange(TOPLIC_EXCHANGE);}    @Bean    public Binding topicBing1(){ return BindingBuilder.bind(toplicQueue1()).to(toplicExchange()).with("topic.key1");    }    @Bean    public Binding topicBing2(){ return BindingBuilder.bind(toplicQueue2()).to(toplicExchange()).with("topic.#");    }    /**     * fanout模式 交换机Exchange     */    @Bean    public FanoutExchange fanoutExchange() { return  new FanoutExchange(FANOUT_EXCHANGE); }    /**     * fanout 绑定exchange     * @return     */    @Bean    public Binding FanoutBing1() { return BindingBuilder.bind(toplicQueue1()).to(fanoutExchange());}    @Bean    public Binding FanoutBing2() { return BindingBuilder.bind(toplicQueue2()).to(fanoutExchange());}    /**     * Header模式     * @return     */    @Bean    public HeadersExchange headersExchange(){ return new HeadersExchange(HEADERS_EXCHANGE);}    @Bean    public Queue headerQueue1() { return new Queue(HEADER_QUEUE,true);    }    @Bean    public Binding headerBing(){ Map<String,Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchange()).whereAll(map).match();    }}

3 创建生产者
使用AmqpTemplate减少手动实现创建连接和通道的过程 只需要关心以何种模式发送消息到何处

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class Sender {    private  static Logger logger = LoggerFactory.getLogger(Sender.class);    @Autowired    AmqpTemplate amqpTemplate;    public void send(String message){  logger.info("MQ SEDN MESSAGE:" + message);  // 发送的队列 , 发送的消息  amqpTemplate.convertAndSend(MqConfig.QUEUE,message);    }    //发送到topic  的交换机    public void sendTopic(String msg){ logger.info("MQ SEND MESSAGE:" + msg); amqpTemplate.convertAndSend(MqConfig.TOPLIC_EXCHANGE,"topic.key1",msg); amqpTemplate.convertAndSend(MqConfig.TOPLIC_EXCHANGE,"topic.key2",msg);    }    //发送到fanout 的交换机    public void sentFanout(String msg){ logger.info("MQ SEND MESSAGE:" + msg); amqpTemplate.convertAndSend(MqConfig.FANOUT_EXCHANGE,"",msg);    }    //发送到header 交换机    public void sendHeader(String msg){ logger.info("MQ SEND MESSAGE:" + msg); //使用Message对msg进行修饰 并封装成一个Message对象进行传输 MessageProperties properties = new MessageProperties(); properties.setHeader("header1","value1"); properties.setAppId("2022"); Message obj = new Message(msg.getBytes(),properties); amqpTemplate.convertAndSend(MqConfig.HEADERS_EXCHANGE,"",obj);    }}

4创建消费者

import edu.uestc.rabbitmq.MQConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class Receiver {    @Autowired    AmqpTemplate amqpTemplate;    private static Logger logger  = LoggerFactory.getLogger(Receiver.class);    @RabbitListener(queues = MQConfig.QUEUE)    public void receive(String message) { logger.info("MQ: message: " + message);    }    @RabbitListener(queues = MQConfig.TOPIC_QUEUE1)    public void receiveTopic1(String message) { logger.info("topic queue1 message: " + message);    }    @RabbitListener(queues = MQConfig.TOPIC_QUEUE2)    public void receiveTopic2(String message) { logger.info("topic queue2 message: " + message);    }    @RabbitListener(queues = MQConfig.HEADER_QUEUE)    public void receiveHeaderQueue(byte[] message) { logger.info("header queue message: " + new String(message));    }}