1 引入依赖 org.apache.rocketmq rocketmq-spring-boot-starter 2.3.12配置rocketmq: name-server: 192.168.150.50:9876 producer: group: dist-test # 生产者组 pull-consumer: # pull模式消费者 group: test topic: MyTopic3 启动类引入配置创建主题:[root@localhost bin]# sh mqadmin updateTopic -n 192.168.150.50:9876 -b 192.168.150.50:10911 -t MyTopic -w 4 -r 4create topic to 192.168.150.50:10911 success.TopicConfig [topicName=MyTopic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]-n 192.168.150.50:9876:Name Server地址。-b 192.168.150.50:10911:Broker地址。-t MyTopic:主题名称。-w 4:写队列数量。-r 4:读队列数量。引入配置类@SpringBootApplication@ImportAutoConfiguration({RocketMQAutoConfiguration.class})4 测试,用SendController测试发送,用SendController测试接收// RocketMQReplyListener 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive// RocketMQListener 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSendbroker的配置也可以不用配置namesrvAddr=autoCreateTopicEnable=truebrokerIP1=192.168.150.50
import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping(\"/rocketmq\")public class SendController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping(\"/send\") public String send() { /** * 普通消息:只负责发送无需等待响应 * 参数1:topic:tag tag可省略不写 * 参数2:Object类型的消息体 * 消费监听器:一般配合着 RocketMQListener 使用 */ rocketMQTemplate.convertAndSend(\"MyTopic2\", \"hello world\");// Message message = new Message(\"topic\", \"tag\", \"key\", \"message body\".getBytes());// message.putUserProperty(\"REPLY_TO_CLIENT\", \"yourClientID\"); // Set the reply property// producer.send(message); return \"success\"; } @GetMapping(\"/send2\") public String send2() { /** * 普通消息:等待消费者响应 * 参数信息和上面一样 * 消费者监听器:一般配合着 RocketMQReplyListener 使用 */ String res = rocketMQTemplate.sendAndReceive(\"MyTopic\", \"hello RocketMQ\", String.class); return res; }}
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.springframework.stereotype.Component;// RocketMQReplyListener 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive// RocketMQListener 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend@Slf4j@Component@RocketMQMessageListener(topic = \"MyTopic\", consumerGroup = \"test_consumer\")public class TestRocketMQMessageListener implements RocketMQReplyListener { @Override public String onMessage(String s) { log.info(\"接收到RocketMQ消息[topic={}] ======> {}\", \"test\", s); return SendStatus.SEND_OK.name(); }}
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;// RocketMQReplyListener 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive// RocketMQListener 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend@Slf4j@Component@RocketMQMessageListener(topic = \"MyTopic2\", consumerGroup = \"test_consumer2\")public class TestRocketMQMessageListener2 implements RocketMQListener { @Override public void onMessage(String s) { log.info(\"接收到RocketMQ消息[topic={}] ======> {}\", \"test\", s); }}