> 文档中心 > Springboot整合RabbitMQ一篇足以

Springboot整合RabbitMQ一篇足以


提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 交换机类型
  • 交换机属性
  • Direct exchange(直连交换机)
  • Fanout exchange(扇型交换机)
  • Topic exchange(主题交换机)
  • 消息持久化

前言

关于rabbitmq安装就不说了,直接安装下来是这个界面,默认账号 密码都是:guest,系统管理员角色,但是这个角色只能在本地使用,不可远程登陆,想要远程登录需要重写添加用户分配角色。
在这里插入图片描述
在可视化端可以手动添加交换机,队列,消息,创建用户,分配权限。

了解一下mq的生产消费流程,认真看别走神,要思考,带着疑问看下面的。
在这里插入图片描述
交换机有四种,消息发布(生产者)是发给哪一种交换机?交换机再把消息给到队列,给的又是哪个队列?订阅者又取哪个队列的消息?

写代码之前了解一下交换机都有哪些,里面都有啥属性。

交换机类型

Direct exchange(直连交换机)

Fanout exchange(扇型交换机)

Topic exchange(主题交换机)

Headers exchange(头交换机)

amq.* exchanges 默认交换机

交换机属性

Name 交换机名称

Type 交换机类型direct、topic、 fanout、 headers

Durability 是否需要持久化。如果持久化,则RabbitMQ重启后,交换机还存在

Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange

Internal 当前Exchange是否于RabbitMQ内部使用,默认为False

导入依赖:

 <!--rabbitmq--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

配置yml

server:  port: 8080spring:  #给项目来个名字  application:    name: rabbitmq-provider  #配置rabbitMq 服务器  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest

Direct exchange(直连交换机)

/** * 直连交换机 生产者配置 * @Author ZYZ * @Date 2022-03-14 15:40 * @Version 1.0 */@Configurationpublic class DirectRabbitConfig {    //队列 起名:TestDirectQueue    @Bean    public Queue TestDirectQueue(){ /**  * durable:是否持久化,默认是false,  * 持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,(被消费过得消息不会被持久化)  * 暂存队列:当前连接有效,重启后队列消失  * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable  * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。  * return new Queue("TestDirectQueue",true,true,false);  * 一般设置一下队列的持久化就好,其余两个就是默认false  */ return new Queue("TestDirectQueue",true);    }    //Direct交换机 起名:TestDirectExchange    @Bean    DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false);    }    //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting(路由键)    @Bean    Binding bindingDirect(){ return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");    }

思想就是,定义一个队列,定义一个交换机,OK绑定他们,发消息的时候就可以把消息发送到定义的交换机,交换机把消息给绑定的队列,一个交换机可以绑定多个队列。
消息推送:

@Resource    RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法    //直连交换机    @GetMapping("/sendDirectMessage")    public String sendDirectMessage() { //设置值 String messageId = String.valueOf(UUID.randomUUID()); String messageData = "Direct Exchange test message, hello!"; String createTime = DateUtil.now(); //封装map一次发送 Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //将消息携带绑定键值:TestDirectRouting(路由键) // 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend(  "TestDirectExchange",  "TestDirectRouting",  map); return "ok";    }

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消息已经加到交换机对应的队列下了。

创建消费模块,pom依赖不变,yml配置也不变。

/** * 直连交换机,消息接收监听 * @Author ZYZ * @Date 2022-03-14 16:25 * @Version 1.0 */@Component@RabbitListener(queues = "TestDirectQueue")//注:监听的队列名称:TestDirectQueuepublic class DirectReceiverService {    @RabbitHandler    public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息  :"+testMessage.toString());    }}

Springboot整合RabbitMQ一篇足以
收到消息,接收成功,实时消费。
注意:直连交换机是一个消费者监听一个队列,如果两个消费者同时监听一个队列,会出现轮询消费消息,消息不会重复。直白点儿就是消费者A,B都监听TestDirectQueue就会出现轮询消费的情况。

Fanout exchange(扇型交换机)

道理一样形式不同,上代码

/** * 扇型交换机 * @Author ZYZ * @Date 2022-03-15 8:18 * @Version 1.0 */@Configurationpublic class FanoutRabbitConfig {    @Resource    private RabbitTemplate rabbitTemplate;    /**     *  创建三个队列 :fanout.A   fanout.B  fanout.C     *  将三个队列都绑定在交换机 fanoutExchange 上     *  因为是扇型交换机, 路由键无需配置,配置也不起作用     */    @Bean    public Queue queueA() { return new Queue("fanout.A",true);    }    @Bean    public Queue queueB() { return new Queue("fanout.B",true);    }    @Bean    public Queue queueC() { return new Queue("fanout.C",true);    }    @Bean    FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange",true,false);    }    @Bean    Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange());    }    @Bean    Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange());    }    @Bean    Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange());    }

创建三个队列,一个交换机,将他们绑定在一起,不需要通过路由键绑定,定义了也无效。
推送消息

//扇型交换机    @GetMapping("/sendFanoutMessage")    public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = DateUtil.now(); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok";    }

这里的路由键直接设为null就行
消费类:

@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA {    @RabbitHandler    public void process(Map testMessage) { System.out.println("FanoutReceiverA消费者收到消息  : " +testMessage.toString());    }}
@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB {    @RabbitHandler    public void process(Map testMessage) { System.out.println("FanoutReceiverB消费者收到消息  : " +testMessage.toString());    }}
@Component@RabbitListener(queues = "fanout.C")public class FanoutReceiverC {    @RabbitHandler    public void process(Map testMessage) { System.out.println("FanoutReceiverC消费者收到消息  : " +testMessage.toString());    }}

在这里插入图片描述
在这里插入图片描述
只要绑定了交换机,三个监听类会同时受到这条消息

Topic exchange(主题交换机)

直接上代码

/** * 主题交换机 * @Author ZYZ * @Date 2022-03-14 17:02 * @Version 1.0 */@Configurationpublic class TopicRabbitConfig {    //*可匹配一个单词    //#可匹配零或多个单词    //例:1.rabbit.test 2.rabbit.test.hello  *可匹配到1不能匹配2  #可以匹配到1 2    //绑定键    public final static String man = "topic.man";    public final static String woman = "topic.woman";    //队列1 起名:topic.man    @Bean    public Queue firstQueue() { return new Queue(TopicRabbitConfig.man,true);    }    //队列2 起名:topic.woman    @Bean    public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman,true);    }    //定义Topic交换机 起名:topicExchange    @Bean    TopicExchange exchange() { return new TopicExchange("topicExchange",true,false);    }    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man    //这样只要是消息携带的路由键是topic.man,才会分发到该队列    @Bean    Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);    }    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#    //这样只要是消息携带的路由键是以topic.开头,都会分发到该队列    @Bean    Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");    }}

发送消息

//主题交换机    @GetMapping("/sendTopicMessage1")    public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = DateUtil.now(); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok";    }    //主题交换机    @GetMapping("/sendTopicMessage2")    public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = DateUtil.now(); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok";    }

接收消息

** * Topic主题交换机,消费 * @Author ZYZ * @Date 2022-03-14 17:16 * @Version 1.0 */@Component@RabbitListener(queues = "topic.man")//注:监听的队列名称:topic.manpublic class TopicManReceiver {    @RabbitHandler    public void process(Map testMessage) { System.out.println("TopicManReceiver消费者收到消息  : " + testMessage.toString());    }}
/** * Topic主题交换机,消费 * @Author ZYZ * @Date 2022-03-14 17:16 * @Version 1.0 */@Component@RabbitListener(queues = "topic.woman")//注:监听的队列名称:topic.womanpublic class TopicWomanReceiver {    @RabbitHandler    public void process(Map testMessage) { System.out.println("TopicTotalReceiver woman消费者收到消息  : " + testMessage.toString());    }}

在这里插入图片描述
Springboot整合RabbitMQ一篇足以
在这里插入图片描述
Springboot整合RabbitMQ一篇足以

消息持久化

需要设置交换机和队列都是持久化状态就可以了,消息默认是持久化的。

Springboot整合RabbitMQ一篇足以
1是非持久化

本篇暂先到这里了,想继续深入了解自动手动确认,死信号,AMQP协议,以及队列长度,消息大小限制,后续再见。

女性物流园