springboot集成activemq记录
docker 安装activemq
docker pull webcenter/activemqdocker run -d --name activemq_serve \ -p 61616:61616 -p 8161:8161 -p 61613:61613 \ --restart=always \-v /home/activemq:/data/activemq \-v /home/activemq/log:/var/log/activemq \webcenter/activemq:latest
访问http://localhost:8161/admin/
用户名 密码 均是admin
集成springboot
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
yml
spring: activemq: broker-url: tcp://你的ip:61616 # MQ所在的服务器的地址 in-memory: true # 是否使用内置的MQ pool: enabled: false #true表示使用连接池 #jms: #pub-sub-domain: true # true为主题模式,false为队列模式 packages: trust-all: true user: admin password: admin
队列模式Queue和主题模式Topic
队列模式
:
点对点模式:生产者发送一条消息到queue,只有一个消费者能收到。可以消费之前的消息
主题模式
:
发布订阅模式:发布者发送到topic的消息,只要订阅了topic的订阅者都会收到消息。这个模式消费者只能消费到订阅之后的数据,还没订阅之前的数据接收不到
生产者消费者测试
配置
在启动类上开启jms@EnableJms
编写配置类ActiveMQConfig
public class ActiveMQConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Bean public Queue queue() { //队列 return new ActiveMQQueue("queue"); // 参数就是队列名 } @Bean public Topic topic(){ // 主题 return new ActiveMQTopic("topic");// 参数就是主题名 } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(brokerUrl); } //Queue模式连接注入 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; } //Topic模式连接注入 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //设置为发布订阅方式, 默认情况下使用的生产消费者方式 bean.setPubSubDomain(true); // 主要是这个 bean.setConnectionFactory(connectionFactory); return bean; }}
队列模式
默认为队列模式
生产者
@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue; @GetMapping("/send")public void send (){jmsMessagingTemplate.convertAndSend("hello","activemq");}@GetMapping("/send")public void send (){jmsMessagingTemplate.convertAndSend(queue,"activemq");}或者@GetMapping("/send")public void send (){ //定义队列模式,而非Topic模式 ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue");jmsMessagingTemplate.convertAndSend(activeMQQueue ,"activemq");}
消费者
@Componentpublic class Listener {@JmsListener(destination = "hello")public void receiveMsg(String text) {System.out.println("接收到消息 : "+text);} // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息 // destination参数是队列名 containerFactory参数是队列配置名,用于区别两种不同模式的连接 @JmsListener(destination = "queue",containerFactory = "jmsListenerContainerQueue") public void receiveQueue(String text) { System.out.println("消息消費者收到的报文为:"+text); }}
主题模式
生产者
@Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic;//订阅模式(topic)发送消息 @RequestMapping("/topicSend") public String topicSend(String text){jmsMessagingTemplate.convertAndSend(topic,text);return "topic 发送成功"; }或者@GetMapping("/topic")public void topic (){ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic");jmsMessagingTemplate.convertAndSend(activeMQTopic,"activemq");}
消费者
// destination参数是队列名 containerFactory参数是队列配置名,用于区别两种不同模式的连接@JmsListener(destination = "topic",containerFactory ="jmsListenerContainerTopic" )public void receiveMsg(String msg) throws JMSException {System.out.println("接收到消息 : "+msg);}