RocketMQ(三)SpringBoot整合RocketMQ
目录
- 1、基础整合
-
- 1.1.POM
- 1.2.yaml配置
- 1.3.Producer代码
- 1.4.Consumer代码
- 1.5.代码测试
- 1.6.topic与tag的指定
- 2、六大消息类型代码实现
- 3、超时设置与MessageEx
- 4、消息发送重试机制
- 5、消息幂等处理
- 6、源码地址
1、基础整合
在SpringBoot启动时,Producer
就会向RocketMQ
注册所有的topic
信息,并且topic名称不能重复。
一个监听器只能监听一个topic。
1.1.POM
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version></dependency>
1.2.yaml配置
# rocketmq配置rocketmq: #rocketmq服务地址集群由`;`分开 name-server: http://162.14.119.135:9876 #自定义的组名称 producer: group: producer_test #消息发送超时时长 send-message-timeout: 5000
1.3.Producer代码
@Service@Slf4jpublic class TestProducer {@ResourcerocketmqTemplate rocketmqTemplate; public void send() { String text = "测试发送"; Message<String> message = MessageBuilder.withPayload(text).build(); log.info("开始发送..."); rocketMQTemplate.send("test_topic", message); log.info("已发送..."); }}
1.4.Consumer代码
@Component@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group")@Slf4jpublic class TestConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("TestConsumer - 接受到消息:" + message); }}
1.5.代码测试
通过Controller接口调用Producer
发送消息到队列中,Consumer
通过监听器监听是否有消息,如果有则获取成功。
1.6.topic与tag的指定
使用rocketmqTemplate
发送消息时没有指定设置topic及tag的参数,而是由参数destination
实现,调用send方法的源码如下:
/* @param destination formats: `topicName:tags`* @param message {@link org.springframework.messaging.Message}*/public void send(D destination, Message<?> message) {this.doSend(destination, message);}private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) { if (destination != null && destination.length() >= 1) { if (payloads != null && payloads.length >= 1) { // 分别获取topic与tag String[] tempArr = destination.split(":", 2); String topic = tempArr[0]; String tags = ""; if (tempArr.length > 1) { tags = tempArr[1]; } } ... } ... }
可以发现,destination
参数,可以由:
分开,第一个参数为topic,第二个参数为tag,默认可以不设置tag参数,tag的设置应该根据实际使用场景来绝对。
消费者监听器通过selectorExpression ="tagName"
获取指定的TAG消息
// 需要获取多个tag时,使用||分隔:"a||b||c"@rocketmqMessageListener(selectorExpression ="tagName"...)
2、六大消息类型代码实现
2.1.基本消息样例
基本消息样例分为以下三重:
- 同步消息:这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- 异步消息:用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
- 单向发送消息:这种方式主要用在不关心发送结果的场景,例如日志发送。
BaseConsumer:
@Component@RocketMQMessageListener(selectorExpression = "", topic = "base_topic", consumerGroup = "base_group")@Slf4jpublic class BaseConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("基本信息案例-接受到消息:" + message); }}
BaseProducer:
@Service@Slf4jpublic class BaseProducer { @Resource RocketMQTemplate rocketMQTemplate; / * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 */ public void sync() { String text = "基本信息案例-同步发送" + System.currentTimeMillis(); log.info(text); rocketMQTemplate.syncSend("base_topic", text); log.info("同步发送-已发送..."); } / * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 */ public void async() { String text = "基本信息案例-异步发送" + System.currentTimeMillis(); log.info(text); for (int a = 1; a <= 10; a++) { rocketMQTemplate.asyncSend("base_topic", text + ",ID:" + a, new SendCallback() { // SendCallback接收异步返回结果的回调 // 成功发送 @Override public void onSuccess(SendResult sendResult) { log.info("异步发送 - 发送成功"); } // 发送失败 @Override public void onException(Throwable throwable) { log.info("异步发送 - 发送失败"); throwable.printStackTrace(); } }); } log.info("异步发送-已发送..."); } / * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 */ public void oneWay() { String text = "基本信息案例-单向发送" + System.currentTimeMillis(); log.info(text); rocketMQTemplate.sendOneWay("base_topic", text); log.info("单向发送-已发送..."); }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 基本信息案例 */ @Resource private BaseProducer baseProducer; @GetMapping("/base") public Object base() { // 同步发送 baseProducer.sync(); // 异步发送 baseProducer.async(); // 单向发送 baseProducer.oneWay(); return "基本消息样例"; }}
效果:
2.2.顺序消息样例
消息有序指的是可以按照消息的发送顺序来消费(FIFO),rocketmq可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号(topic)相同的消息会被先后发送到同一个队列中,消费时,同一个订单号获取到的肯定是同一个队列。
OrderConsumer:
@Component@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group", consumeMode = ConsumeMode.ORDERLY)@Slf4jpublic class OrderConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("顺序消息生产-接受到消息:" + message); }}
OrderProducer:
@Service@Slf4jpublic class OrderProducer { @Resource RocketMQTemplate rocketMQTemplate; public void order() { log.info("顺序消息"); try { for (int i = 1; i <= 10; i++) { int num = (int) (Math.random() * 10000); // 设置一个延时,表示同一个消息先后进入到队形中 TimeUnit.MILLISECONDS.sleep(50); log.info("顺序消息,ID:" + num); // 第一个参数为topic,第二个参数为内容,第三个参数为Hash值,不同hash值在不同的队列中 rocketMQTemplate.syncSendOrderly("order_topic", "顺序消息,ID:" + num, "order"); } log.info("已发送..."); } catch (Exception e) { e.printStackTrace(); } }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 顺序消息发送样例 */ @Resource private OrderProducer orderProducer; @GetMapping("/order") public Object order() { orderProducer.order(); return "发送顺序消息"; }}
2.3.延时消息样例
通过设置延时等级,实现消费者延时消费数据,比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列。
// 固定延时,设置参数时,对应数值为:1~18private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
延时原理:
Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;
若有则需要经历一个复杂的过程:修改消息、投递延时消息、将消息重新写入commitlog
-
修改消息
-
修改消息的Topic为SCHEDULE_TOPIC_XXXX
-
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件。
延迟等级delayLevel与queueId的对应关系为:queueId = delayLevel -1
-
修改消息索引单元内容。将MessageTagHashCode中原本存放消息的Tag的Hash值,现修改为消息的投递时间。
投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。
投递时间 = 消息存储时间 + 延时等级时间。
消息存储时间指的是消息被发送到Broker时的时间戳。 -
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
-
-
投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的==投递时间,将延时消息投递到⽬标Topic中。==不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息,然后再次将消息投递到目标Topic中。
-
消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。
这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。
ScheduleConsumer:
@Component@RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group")@Slf4jpublic class ScheduleConsumer implements RocketMQListener<String> { / * 测试接收将参数topic定死,实际开发写入到配置文件 * @param message */ @Override public void onMessage(String message) { log.info("延时消息-接受到消息:" + message); }}
ScheduledProducer:
@Service@Slf4jpublic class ScheduledProducer { / * 测试发送将参数topic定死,实际开发写入到配置文件 */ @Resource RocketMQTemplate rocketMQTemplate; public void scheduled() { String text = "延时消息"+ System.currentTimeMillis(); log.info(text); // 设置延时等级2,这个消息将在5s之后发送 // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Message<String> message = MessageBuilder.withPayload(text).build(); rocketMQTemplate.syncSend("scheduled_topic", message, 1000, 2); log.info("已发送..."); }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 延时消息 */ @Resource private ScheduledProducer scheduledProducer; @GetMapping("/scheduled") public Object scheduled() { scheduledProducer.scheduled(); return "发送延时消息"; }}
效果:
2.4.标签过滤消息样例
一个应用尽可能用一个Topic,消息子类型用tag
来标识,tag
可以由应用自由设置。 在使用rocketmqTemplate
发送消息时,通过设置发送方法的destination
参数来设置消息的目的地,destination
的格式为topicName:tagName
,:
前面表示topic的名称,后面表示tag
名称。
TagConsumer:
@Component@RocketMQMessageListener(selectorExpression = "TAG-A||TAG-B", topic = "tag_topic", consumerGroup = "tag_group")@Slf4jpublic class TagConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("标签过滤消息-接受到消息:" + message); }}
TagProducer:
@Service@Slf4jpublic class TagProducer { @Resource RocketMQTemplate rocketMQTemplate; public void tag() { String text = "标签过滤消息," + System.currentTimeMillis(); log.info(text); // 任何类型的send方法均可以指定TAG,默认可以不指定则为* Message<String> message = MessageBuilder.withPayload(text).build(); rocketMQTemplate.syncSend("tag_topic:TAG-A", message); log.info("已发送..."); }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 标签过虑消息 */ @Resource private TagProducer tagProducer; @GetMapping("/tag") public Object tag() { // TAG过滤 tagProducer.tag(); return "指定标签消息"; }}
2.5.SQL92过滤消息样例
TAG过滤消息只能有一个标签,这对于复杂的场景可能不起作用,通过SQL表达式筛选消息可以实现复杂的情况。
默认情况下,rocketmq不支持SQL92过滤形式,需要对broker
进行配置
# 配置vim /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf# 在配置文件最后加上,并且需要重启enablePropertyFilter=true# 进入目录,进行重启操作cd /opt/rocketmq/rocketmq-4.9.2# 关闭sh bin/mqshutdown broker# 启动Broker,-n 指向NameServer地址,多个由`;`分开,也可以在配置文件中设置nohup sh bin/mqbroker -n 162.14.119.135:9876 -c /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf autoCreateTopicEnable=true &>/opt/rocketmq/rocketmq-4.9.2/logs/broker.log 2>&1 &
rocketmq执行一次SQL的基础语法:
语法特性:
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
-
数值,比如:123,4.1415;
-
字符,比如:‘abc’,必须用单引号包裹起来;
-
NULL,特殊的常量
-
布尔值,TRUE 或 FALSE
使用:
通过putUserProperty(key,value)
指定一个参数的值(putUserProperty可以存在多个)
Consumer通过SQL语法来筛选是否满足设置的参数条件,如果满足则消费消息
SQLConsumer:
@Component@RocketMQMessageListener(selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 6 or b > 8", topic = "sql_topic", consumerGroup = "sql_group")@Slf4jpublic class SQLConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("SQL92过滤消息-接受到消息:" + message); }}
SQLProducer:
@Service@Slf4jpublic class SQLProducer { @Resource RocketMQTemplate rocketMQTemplate; / * SQL92过滤消息 */ public void selector() { String text = "SQL92过滤消息" + System.currentTimeMillis(); log.info(text); Message<String> message = MessageBuilder.withPayload(text).build(); // 设置参数 Map<String, Object> map = new HashMap<>(4); map.put("a", 2); map.put("b", 10); rocketMQTemplate.convertAndSend("sql_topic", message, map); log.info("已发送..."); }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * SQL92过滤消息 */ @Resource private SQLProducer SQLProducer; @GetMapping("/selector") public Object selector() { // SQL92过滤 SQLProducer.selector(); return "过滤消息样例"; }}
2.6.批量消息样例
生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。
发送限制:
- 批量发送的消息必须具有相同的Topic。
- 批量发送的消息必须具有相同的刷盘策略。
- 批量发送的消息不能是延时消息与事务消息。
- 消息的总大小不应超过4MB。
发送大小:
默认情况下,一批发送的消息总大小不能超过4MB字节,如果想超出该值,有两种解决方案:
- 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送,一般不大于1M。
- 方案二:在Producer端与Broker端修改属性。
Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性
BatchConsumer:
@Component@RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group")@Slf4jpublic class BatchConsumer implements RocketMQListener<String> { / * * @param message */ @Override public void onMessage(String message) { log.info("批量消息-接受到消息:" + message); }}
MessageSplitter:
import org.springframework.messaging.Message;import java.util.Iterator;import java.util.List;public class MessageSplitter implements Iterator<List<Message>> { / * 分割数据大小 */ private final int sizeLimit = 1024 * 1024; ; / * 分割数据列表 */ private final List<Message> messages; / * 分割索引 */ private int currIndex; public MessageSplitter(List<Message> messages) { this.messages = messages; // 保证单条数据的大小不大于sizeLimit messages.forEach(m -> { if (m.toString().length() > sizeLimit) { throw new RuntimeException("单挑消息不能大于" + sizeLimit + "B"); } }); } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message t = messages.get(nextIndex); totalSize = totalSize + t.toString().length(); if (totalSize > sizeLimit) { break; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; }}
BatchProducer:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.ArrayList;import java.util.List;@Service@Slf4jpublic class BatchProducer { / * 测试发送将参数topic定死,实际开发写入到配置文件 */ @Resource RocketMQTemplate rocketMQTemplate; public void batch() { String text = "批量消息"; log.info(text); List<Message> messageList = new ArrayList<>(); for (int i = 1; i <= 10; i++) { messageList.add(MessageBuilder.withPayload(text + "--" + i).build()); } log.info("开始发送..."); //把大的消息分裂成若干个小的消息 MessageSplitter splitter = new MessageSplitter(messageList); while (splitter.hasNext()) { List<Message> nextList = splitter.next(); SendResult result = rocketMQTemplate.syncSend("batch_topic", nextList); if (result.getSendStatus() == SendStatus.SEND_OK) { log.info("发送批量消息成功!消息ID为:{}", result.getMsgId()); } else { log.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus()); } } log.info("已发送..."); }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 批量消息发送 */ @Resource private BatchProducer batchProducer; @GetMapping("/batch") public Object batch() { // 批量消息样例 batchProducer.batch(); return "批量消息样例"; }}
2.7.回馈消息样例
生产者通过sendAndReceive
发送消息,消费者需要实现rocketmqReplyListener接口
如果连续通过sendAndReceive
发送消息,生产者必须收到消费者的回复才能发送下一条消息。
ReplyConsumer:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQReplyListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "reply_topic", consumerGroup = "reply_group")@Slf4jpublic class ReplyConsumer implements RocketMQReplyListener<String, byte[]> { @Override public byte[] onMessage(String message) { log.info("接受到消息:" + message); // 返回消息到生成者 return "返回消息到生产者".getBytes(); }}
ReplyProducer:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class ReplyProducer { @Resource RocketMQTemplate rocketMQTemplate; public void reply() { // 如果消费者没有回馈消息,则不会发送下一条消息 for (int i = 1; i <= 10; i++) { String text = "回馈消息" + "--" + i; log.info("发送" + text); Object obj = rocketMQTemplate.sendAndReceive("reply_topic", text, String.class); log.info("消费者返回的消息:" + obj); } }}
Controller:
@RestController@RequestMapping("test")public class TestController { / * 回馈消息样例 */ @Resource private ReplyProducer replyProducer; @GetMapping("/reply") public Object reply() { // 消息事务 replyProducer.reply(); return "回馈消息样例"; }}
效果:
3、超时设置与MessageEx
消费者监听器需要实现rocketmqListener 接口,通过传入的泛型为String
,则可以直接获取生产者传递的消息,如果想要获取消息的额外详情信息,需要传入泛型MessageEx
@Component@rocketmqMessageListener(topic = "topicName", consumerGroup = "groupName")public class ExConsumer implements rocketmqListener<MessageExt> { @Override public void onMessage(MessageExt message) { }}
发送消息时可以设置超时时间,通过配置rocketmq.producer.send-message-timeout =
实现全局消息超时设置,也可以对每个发送的消息配置单独的超时时间,比如:
rocketmqTemplate.syncSend("topic", message, 1000);
2、在SpringBoot整合中,一个监听器只能监听一个topic,并且topic不能重复。
4、消息发送重试机制
Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的。
- 只有普通消息具有发送重试机制,顺序消息是没有的。
- 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在rocketmq中是无法避免的问题。
- 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
5、消息幂等处理
由于某种原因(比如:网络),导致MQ不知道消息已经被消费,再次将该消息分发给其他的消费者。(因为消息重试等机制的原因,如果一个consumer断了,rocketmq有consumer集群,会将该消息重新发给其他consumer)。
因此为了防止重复消费,需要进行幂等处理,方案如下:
1、 将已经处理的消息存入数据库中,每次处理前先进行查询操作,判断当前消息是否成功处理。
2、 将已经处理的消息存入redis库中,每次处理前先进行查询操作,判断当前消息是否成功处理。
6、源码地址
源码地址:https://gitee.com/lhzlx/spring-boot-rocket-mq-demo.git