> 文档中心 > springboot集成activemq记录

springboot集成activemq记录


docker 安装activemq

docker pull webcenter/activemqdocker run  -d  --name activemq_serve \ -p 61616:61616 -p 8161:8161  -p 61613:61613 \ --restart=always \-v /home/activemq:/data/activemq \-v /home/activemq/log:/var/log/activemq \webcenter/activemq:latest

访问http://localhost:8161/admin/
用户名 密码 均是admin
在这里插入图片描述

集成springboot

依赖

 <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-activemq</artifactId> </dependency>

yml

spring:  activemq:    broker-url: tcp://你的ip:61616 # MQ所在的服务器的地址    in-memory: true  # 是否使用内置的MQ    pool:      enabled: false #true表示使用连接池    #jms:      #pub-sub-domain: true # true为主题模式,false为队列模式    packages:      trust-all: true    user: admin    password: admin

队列模式Queue和主题模式Topic

队列模式:
点对点模式:生产者发送一条消息到queue,只有一个消费者能收到。可以消费之前的消息

主题模式
发布订阅模式:发布者发送到topic的消息,只要订阅了topic的订阅者都会收到消息。这个模式消费者只能消费到订阅之后的数据,还没订阅之前的数据接收不到

生产者消费者测试

配置

在启动类上开启jms@EnableJms
编写配置类ActiveMQConfig

public class ActiveMQConfig {     @Value("${spring.activemq.broker-url}")    private  String brokerUrl;     @Bean    public Queue queue() { //队列 return new ActiveMQQueue("queue"); // 参数就是队列名    }     @Bean    public Topic topic(){ // 主题 return new ActiveMQTopic("topic");// 参数就是主题名    }      @Bean    public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(brokerUrl);    } //Queue模式连接注入    @Bean    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean;    } //Topic模式连接注入    @Bean    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //设置为发布订阅方式, 默认情况下使用的生产消费者方式 bean.setPubSubDomain(true); // 主要是这个 bean.setConnectionFactory(connectionFactory); return bean;    }}

队列模式

默认为队列模式
生产者

@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue;  @GetMapping("/send")public void send (){jmsMessagingTemplate.convertAndSend("hello","activemq");}@GetMapping("/send")public void send (){jmsMessagingTemplate.convertAndSend(queue,"activemq");}或者@GetMapping("/send")public void send (){ //定义队列模式,而非Topic模式 ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue");jmsMessagingTemplate.convertAndSend(activeMQQueue ,"activemq");}

消费者

@Componentpublic class Listener {@JmsListener(destination = "hello")public void receiveMsg(String text) {System.out.println("接收到消息 : "+text);}  // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息  // destination参数是队列名  containerFactory参数是队列配置名,用于区别两种不同模式的连接    @JmsListener(destination = "queue",containerFactory = "jmsListenerContainerQueue")     public void receiveQueue(String text) { System.out.println("消息消費者收到的报文为:"+text);   }}

主题模式

生产者

   @Autowired   private JmsMessagingTemplate jmsMessagingTemplate;   @Autowired   private Topic topic;//订阅模式(topic)发送消息   @RequestMapping("/topicSend")   public String topicSend(String text){jmsMessagingTemplate.convertAndSend(topic,text);return "topic 发送成功";   }或者@GetMapping("/topic")public void topic (){ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic");jmsMessagingTemplate.convertAndSend(activeMQTopic,"activemq");}

消费者

// destination参数是队列名  containerFactory参数是队列配置名,用于区别两种不同模式的连接@JmsListener(destination = "topic",containerFactory ="jmsListenerContainerTopic" )public void receiveMsg(String msg) throws JMSException {System.out.println("接收到消息 : "+msg);}

springboot集成activemq记录