> 文档中心 > Spring Boot【定制化】~ 集成RabbitMQ

Spring Boot【定制化】~ 集成RabbitMQ

目录

1、简介

1.1、概述

1.2、两种消息发布模式

一、点对点发布模式

二、发布订阅模式

2、整合RabbitMQ

2.1、引入RabbitMQ依赖

2.2、自定义消息发送转换器

2.3、生产消息与消费消息

2.4、amqp管理器

2.5、开启注解模式RabbitMQ


1、简介

1.1、概述

  • MQ全称为Message Queue,即消息队列
  • “消息队列”是在消息的传输过程中保存消息的容器。
  • 它是典型的:生产者、消费者模型。
  • 生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
  • 消息的生产和消费都是异步的。

1.2、两种消息发布模式

一、点对点发布模式

  • 消息生产者把生产的消息放进指定的消息队列中,消息消费者从队列中取出消息内容(消息被取出就会从队列中移除)。

二、发布订阅模式

  • 消息生产者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么 就会在消息到达时同时收到消息。

2、整合RabbitMQ

2.1、引入RabbitMQ依赖

    org.springframework.boot    spring-boot-starter-amqp

2.2、自定义消息发送转换器

作用:以JSON的形式发送消息到MQ客户端

/** * mq配置 * * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 13:23 */@Configurationpublic class RuanjiaMqpConfig{    /**     * 自定义消息发送转换器(Jackson 2 Json 消息转换器)     * @return     */    @Bean    public MessageConverter messageConverter()    { return new Jackson2JsonMessageConverter();    }}

2.3、生产消息与消费消息

使用方法:通过amqp提供的消息操作模板 RabbitTemplate 完成生产消息与消费消息

@SpringBootTestclass SpringbootDay06MqApplicationTests{    @Autowired    private RabbitTemplate rabbitTemplate;    /** 单播(direct):点对点发送 */    @Test    public void sendInfoTest01()    { // 三个参数:交换器,路由Key,消息内容(该参数类型是Object) rabbitTemplate.convertAndSend("exchanges.direct","ruanjia.news", "点对点消息发送");    }    /** 接收指定消息队列消息 */    @Test    public void queryMqDataTest01()    { Object data = rabbitTemplate.receiveAndConvert("ruanjia.news"); if (data != null) {     System.out.println(data.getClass());     System.out.println(data); }    }    /** 单播(direct):点对点发送 */    @Test    public void sendInfoTest02()    { Commodity commodity = new Commodity(); commodity.setCommodityId(2); commodity.setCommodityName("手机111"); commodity.setCommodityPrice("3999 元"); commodity.setCommodityType("H"); commodity.setCommodityAddDateTime(new Date()); // 三个参数:交换器,路由Key,消息内容(该参数类型是Object) rabbitTemplate.convertAndSend("exchanges.direct","ruanjia", commodity);    }    /** 接收指定消息队列消息 */    @Test    public void queryMqDataTest02()    { // 接收并转换消息队列数据(根据消息队列名字接收) Object data = rabbitTemplate.receiveAndConvert("ruanjia"); if (data != null) {     System.out.println(data.getClass());     System.out.println(data); }    }    /** 广播(fanout):广播形式发送(全部队列都会收到发送的消息)*/    @Test    public void sendInfoTest03()    { rabbitTemplate.convertAndSend("exchanges.fanout", null,"广播形式发送(全部队列都会收到发送的消息)!");    }    /** 接收指定消息队列消息 */    @Test    public void queryMqDataTest03()    { // 接收并转换消息队列数据(根据消息队列名字接收) String exchange = rabbitTemplate.getExchange(); System.out.println(exchange); rabbitTemplate.receiveAndConvert("ruanjia"); rabbitTemplate.receiveAndConvert("ruanjia.news"); rabbitTemplate.receiveAndConvert("ruanjia.emps");    }}

2.4、amqp管理器

作用:创建交换器、创建消息队列/删除交换器、删除消息队列...等等

/** * ma管理控制(创建交换器、创建消息队列/删除交换器、删除消息队列...等等) * * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 14:27 */@SpringBootTestpublic class MqAdminTest01{    /** ma管理控制器 */    @Autowired    private AmqpAdmin amqpAdmin;    @Autowired    private RabbitTemplate rabbitTemplate;    /** 创建交换器 */    @Test    public void addExchange()    { //创建:exchange.test交换器 amqpAdmin.declareExchange(new DirectExchange("exchange.test01")); //删除交换器 //amqpAdmin.deleteExchange("exchange.test01");    }    /** 创建消息队列 */    @Test    public void addQueue()    { //创建:queue.test01队列 amqpAdmin.declareQueue(new Queue("queue.test01", true)); //删除消息队列 //amqpAdmin.deleteQueue("queue.test01");    }    /** 交换器绑定队列 */    @Test    public void bindingTest01()    { /**  * 参数一:被绑定的队列  * 参数二:目标类型  * 参数三:绑定的交换器  * 参数四:路由Key  * 参数五:设置null即可  */ amqpAdmin.declareBinding(new Binding("queue.test01", Binding.DestinationType.QUEUE, "exchange.test01", "router.key01", null));    }    /** 发消息 */    @Test    public void sendInfo()    { rabbitTemplate.convertAndSend("exchange.test01", "router.key01", "消息发送测试!");    }    /** 取消息 */    @Test    public void queryInfo()    { Object o = rabbitTemplate.receiveAndConvert("queue.test01"); System.out.println(o);    }    /** 创建广播类型转换器(fanout) */    @Test    public void createExchange()    { amqpAdmin.declareExchange(new Exchange() {     @Override     public String getName() {  return "ruanjia.exchange.fanout"; //转换器名称     }     @Override     public String getType() {  return "fanout"; //转换器类型     }     @Override     public boolean isDurable() {  return false;     }     @Override     public boolean isAutoDelete() {  return false;     }     @Override     public Map getArguments() {  return null;     }     @Override     public boolean isDelayed() {  return false;     }     @Override     public boolean isInternal() {  return false;     }     @Override     public boolean shouldDeclare() {  return false;     }     @Override     public Collection getDeclaringAdmins() {  return null;     }     @Override     public boolean isIgnoreDeclarationExceptions() {  return false;     } });    }}

2.5、开启注解模式RabbitMQ

开启方式:启动类添加 @EnableRabbit 注解即可,然后使用 @RabbitListener 注解,实时监听消息队列内容。

/** * @EnableRabbit    开启基于注解的RabbitMQ模式 * @RabbitListener  监听消息队列的内容 */@EnableRabbit //开启注解模式mq@SpringBootApplicationpublic class SpringbootDay06MqApplication{    public static void main(String[] args) { SpringApplication.run(SpringbootDay06MqApplication.class, args);    }}
/** * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 13:42 */@Servicepublic class CommodityService{    /**     * @RabbitListener 实时监听消息队列内容     *      queues:根据消息队列名实时监听接收消息(如果队列中有消息)     *     * @param commodity 商品对象     * @return 返回结果     */    @RabbitListener(queues = {"ruanjia"}) //ruanjia 消息队列    public void queryCommodityInfo(Commodity commodity)    { System.out.println("接收消息:"+commodity);    }    /**     *      * @param message amqp提供的消息对象     */    @RabbitListener(queues = {"ruanjia.news"})    public void queryFanoutInfo(Message message)    { if (message !=null) {     System.out.println("----------------------------------------------------------------------");     System.out.println("body:"+ message.getBody()); //获取消息体(ruanjia.news中的消息内容)     System.out.println("body:"+ message.getMessageProperties()); //获取消息属性     System.out.println("----------------------------------------------------------------------"); }    }}

blibli找视频看看就懂了!

书本网