Spring Boot【定制化】~ 集成RabbitMQ
目录
1、简介
1.1、概述
1.2、两种消息发布模式
一、点对点发布模式
二、发布订阅模式
2、整合RabbitMQ
2.1、引入RabbitMQ依赖
2.2、自定义消息发送转换器
2.3、生产消息与消费消息
2.4、amqp管理器
2.5、开启注解模式RabbitMQ
1、简介
1.1、概述
- MQ全称为Message Queue,即消息队列。
- “消息队列”是在消息的传输过程中保存消息的容器。
- 它是典型的:生产者、消费者模型。
- 生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
- 消息的生产和消费都是异步的。
1.2、两种消息发布模式
一、点对点发布模式
- 消息生产者把生产的消息放进指定的消息队列中,消息消费者从队列中取出消息内容(消息被取出就会从队列中移除)。
二、发布订阅模式
- 消息生产者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么 就会在消息到达时同时收到消息。
2、整合RabbitMQ
2.1、引入RabbitMQ依赖
org.springframework.boot spring-boot-starter-amqp
2.2、自定义消息发送转换器
作用:以JSON的形式发送消息到MQ客户端
/** * mq配置 * * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 13:23 */@Configurationpublic class RuanjiaMqpConfig{ /** * 自定义消息发送转换器(Jackson 2 Json 消息转换器) * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }}
2.3、生产消息与消费消息
使用方法:通过amqp提供的消息操作模板 RabbitTemplate 完成生产消息与消费消息
@SpringBootTestclass SpringbootDay06MqApplicationTests{ @Autowired private RabbitTemplate rabbitTemplate; /** 单播(direct):点对点发送 */ @Test public void sendInfoTest01() { // 三个参数:交换器,路由Key,消息内容(该参数类型是Object) rabbitTemplate.convertAndSend("exchanges.direct","ruanjia.news", "点对点消息发送"); } /** 接收指定消息队列消息 */ @Test public void queryMqDataTest01() { Object data = rabbitTemplate.receiveAndConvert("ruanjia.news"); if (data != null) { System.out.println(data.getClass()); System.out.println(data); } } /** 单播(direct):点对点发送 */ @Test public void sendInfoTest02() { Commodity commodity = new Commodity(); commodity.setCommodityId(2); commodity.setCommodityName("手机111"); commodity.setCommodityPrice("3999 元"); commodity.setCommodityType("H"); commodity.setCommodityAddDateTime(new Date()); // 三个参数:交换器,路由Key,消息内容(该参数类型是Object) rabbitTemplate.convertAndSend("exchanges.direct","ruanjia", commodity); } /** 接收指定消息队列消息 */ @Test public void queryMqDataTest02() { // 接收并转换消息队列数据(根据消息队列名字接收) Object data = rabbitTemplate.receiveAndConvert("ruanjia"); if (data != null) { System.out.println(data.getClass()); System.out.println(data); } } /** 广播(fanout):广播形式发送(全部队列都会收到发送的消息)*/ @Test public void sendInfoTest03() { rabbitTemplate.convertAndSend("exchanges.fanout", null,"广播形式发送(全部队列都会收到发送的消息)!"); } /** 接收指定消息队列消息 */ @Test public void queryMqDataTest03() { // 接收并转换消息队列数据(根据消息队列名字接收) String exchange = rabbitTemplate.getExchange(); System.out.println(exchange); rabbitTemplate.receiveAndConvert("ruanjia"); rabbitTemplate.receiveAndConvert("ruanjia.news"); rabbitTemplate.receiveAndConvert("ruanjia.emps"); }}
2.4、amqp管理器
作用:创建交换器、创建消息队列/删除交换器、删除消息队列...等等
/** * ma管理控制(创建交换器、创建消息队列/删除交换器、删除消息队列...等等) * * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 14:27 */@SpringBootTestpublic class MqAdminTest01{ /** ma管理控制器 */ @Autowired private AmqpAdmin amqpAdmin; @Autowired private RabbitTemplate rabbitTemplate; /** 创建交换器 */ @Test public void addExchange() { //创建:exchange.test交换器 amqpAdmin.declareExchange(new DirectExchange("exchange.test01")); //删除交换器 //amqpAdmin.deleteExchange("exchange.test01"); } /** 创建消息队列 */ @Test public void addQueue() { //创建:queue.test01队列 amqpAdmin.declareQueue(new Queue("queue.test01", true)); //删除消息队列 //amqpAdmin.deleteQueue("queue.test01"); } /** 交换器绑定队列 */ @Test public void bindingTest01() { /** * 参数一:被绑定的队列 * 参数二:目标类型 * 参数三:绑定的交换器 * 参数四:路由Key * 参数五:设置null即可 */ amqpAdmin.declareBinding(new Binding("queue.test01", Binding.DestinationType.QUEUE, "exchange.test01", "router.key01", null)); } /** 发消息 */ @Test public void sendInfo() { rabbitTemplate.convertAndSend("exchange.test01", "router.key01", "消息发送测试!"); } /** 取消息 */ @Test public void queryInfo() { Object o = rabbitTemplate.receiveAndConvert("queue.test01"); System.out.println(o); } /** 创建广播类型转换器(fanout) */ @Test public void createExchange() { amqpAdmin.declareExchange(new Exchange() { @Override public String getName() { return "ruanjia.exchange.fanout"; //转换器名称 } @Override public String getType() { return "fanout"; //转换器类型 } @Override public boolean isDurable() { return false; } @Override public boolean isAutoDelete() { return false; } @Override public Map getArguments() { return null; } @Override public boolean isDelayed() { return false; } @Override public boolean isInternal() { return false; } @Override public boolean shouldDeclare() { return false; } @Override public Collection getDeclaringAdmins() { return null; } @Override public boolean isIgnoreDeclarationExceptions() { return false; } }); }}
2.5、开启注解模式RabbitMQ
开启方式:启动类添加 @EnableRabbit 注解即可,然后使用 @RabbitListener 注解,实时监听消息队列内容。
/** * @EnableRabbit 开启基于注解的RabbitMQ模式 * @RabbitListener 监听消息队列的内容 */@EnableRabbit //开启注解模式mq@SpringBootApplicationpublic class SpringbootDay06MqApplication{ public static void main(String[] args) { SpringApplication.run(SpringbootDay06MqApplication.class, args); }}
/** * @author cms * @version 1.0.0.0 * @Date: 2022/5/25 13:42 */@Servicepublic class CommodityService{ /** * @RabbitListener 实时监听消息队列内容 * queues:根据消息队列名实时监听接收消息(如果队列中有消息) * * @param commodity 商品对象 * @return 返回结果 */ @RabbitListener(queues = {"ruanjia"}) //ruanjia 消息队列 public void queryCommodityInfo(Commodity commodity) { System.out.println("接收消息:"+commodity); } /** * * @param message amqp提供的消息对象 */ @RabbitListener(queues = {"ruanjia.news"}) public void queryFanoutInfo(Message message) { if (message !=null) { System.out.println("----------------------------------------------------------------------"); System.out.println("body:"+ message.getBody()); //获取消息体(ruanjia.news中的消息内容) System.out.println("body:"+ message.getMessageProperties()); //获取消息属性 System.out.println("----------------------------------------------------------------------"); } }}
blibli找视频看看就懂了!