Springboot整合RabbitMQ一篇足以
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
前言
关于rabbitmq安装就不说了,直接安装下来是这个界面,默认账号 密码都是:guest,系统管理员角色,但是这个角色只能在本地使用,不可远程登陆,想要远程登录需要重写添加用户分配角色。
在可视化端可以手动添加交换机,队列,消息,创建用户,分配权限。
了解一下mq的生产消费流程,认真看别走神,要思考,带着疑问看下面的。
交换机有四种,消息发布(生产者)是发给哪一种交换机?交换机再把消息给到队列,给的又是哪个队列?订阅者又取哪个队列的消息?
写代码之前了解一下交换机都有哪些,里面都有啥属性。
交换机类型
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
amq.* exchanges 默认交换机
交换机属性
Name 交换机名称
Type 交换机类型direct、topic、 fanout、 headers
Durability 是否需要持久化。如果持久化,则RabbitMQ重启后,交换机还存在
Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
Internal 当前Exchange是否于RabbitMQ内部使用,默认为False
导入依赖:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置yml
server: port: 8080spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
Direct exchange(直连交换机)
/** * 直连交换机 生产者配置 * @Author ZYZ * @Date 2022-03-14 15:40 * @Version 1.0 */@Configurationpublic class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue(){ /** * durable:是否持久化,默认是false, * 持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,(被消费过得消息不会被持久化) * 暂存队列:当前连接有效,重启后队列消失 * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * return new Queue("TestDirectQueue",true,true,false); * 一般设置一下队列的持久化就好,其余两个就是默认false */ return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting(路由键) @Bean Binding bindingDirect(){ return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); }
思想就是,定义一个队列,定义一个交换机,OK绑定他们,发消息的时候就可以把消息发送到定义的交换机,交换机把消息给绑定的队列,一个交换机可以绑定多个队列。
消息推送:
@Resource RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 //直连交换机 @GetMapping("/sendDirectMessage") public String sendDirectMessage() { //设置值 String messageId = String.valueOf(UUID.randomUUID()); String messageData = "Direct Exchange test message, hello!"; String createTime = DateUtil.now(); //封装map一次发送 Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //将消息携带绑定键值:TestDirectRouting(路由键) // 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend( "TestDirectExchange", "TestDirectRouting", map); return "ok"; }
消息已经加到交换机对应的队列下了。
创建消费模块,pom依赖不变,yml配置也不变。
/** * 直连交换机,消息接收监听 * @Author ZYZ * @Date 2022-03-14 16:25 * @Version 1.0 */@Component@RabbitListener(queues = "TestDirectQueue")//注:监听的队列名称:TestDirectQueuepublic class DirectReceiverService { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息 :"+testMessage.toString()); }}
收到消息,接收成功,实时消费。
注意:直连交换机是一个消费者监听一个队列,如果两个消费者同时监听一个队列,会出现轮询消费消息,消息不会重复。直白点儿就是消费者A,B都监听TestDirectQueue就会出现轮询消费的情况。
Fanout exchange(扇型交换机)
道理一样形式不同,上代码
/** * 扇型交换机 * @Author ZYZ * @Date 2022-03-15 8:18 * @Version 1.0 */@Configurationpublic class FanoutRabbitConfig { @Resource private RabbitTemplate rabbitTemplate; /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A",true); } @Bean public Queue queueB() { return new Queue("fanout.B",true); } @Bean public Queue queueC() { return new Queue("fanout.C",true); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange",true,false); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); }
创建三个队列,一个交换机,将他们绑定在一起,不需要通过路由键绑定,定义了也无效。
推送消息
//扇型交换机 @GetMapping("/sendFanoutMessage") public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = DateUtil.now(); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok"; }
这里的路由键直接设为null就行
消费类:
@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString()); }}
@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString()); }}
@Component@RabbitListener(queues = "fanout.C")public class FanoutReceiverC { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString()); }}
只要绑定了交换机,三个监听类会同时受到这条消息
Topic exchange(主题交换机)
直接上代码
/** * 主题交换机 * @Author ZYZ * @Date 2022-03-14 17:02 * @Version 1.0 */@Configurationpublic class TopicRabbitConfig { //*可匹配一个单词 //#可匹配零或多个单词 //例:1.rabbit.test 2.rabbit.test.hello *可匹配到1不能匹配2 #可以匹配到1 2 //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; //队列1 起名:topic.man @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man,true); } //队列2 起名:topic.woman @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman,true); } //定义Topic交换机 起名:topicExchange @Bean TopicExchange exchange() { return new TopicExchange("topicExchange",true,false); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# //这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); }}
发送消息
//主题交换机 @GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = DateUtil.now(); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; } //主题交换机 @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = DateUtil.now(); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok"; }
接收消息
** * Topic主题交换机,消费 * @Author ZYZ * @Date 2022-03-14 17:16 * @Version 1.0 */@Component@RabbitListener(queues = "topic.man")//注:监听的队列名称:topic.manpublic class TopicManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString()); }}
/** * Topic主题交换机,消费 * @Author ZYZ * @Date 2022-03-14 17:16 * @Version 1.0 */@Component@RabbitListener(queues = "topic.woman")//注:监听的队列名称:topic.womanpublic class TopicWomanReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicTotalReceiver woman消费者收到消息 : " + testMessage.toString()); }}
消息持久化
需要设置交换机和队列都是持久化状态就可以了,消息默认是持久化的。
1是非持久化
本篇暂先到这里了,想继续深入了解自动手动确认,死信号,AMQP协议,以及队列长度,消息大小限制,后续再见。