> 文档中心 > RocketMQ(三)SpringBoot整合RocketMQ

RocketMQ(三)SpringBoot整合RocketMQ

目录

  • 1、基础整合
    • 1.1.POM
    • 1.2.yaml配置
    • 1.3.Producer代码
    • 1.4.Consumer代码
    • 1.5.代码测试
    • 1.6.topic与tag的指定
  • 2、六大消息类型代码实现
    • 2.1.基本消息样例
    • 2.2.顺序消息样例
    • 2.3.延时消息样例
    • 2.4.标签过滤消息样例
    • 2.5.SQL92过滤消息样例
    • 2.6.批量消息样例
    • 2.7.回馈消息样例
  • 3、超时设置与MessageEx
  • 4、消息发送重试机制
  • 5、消息幂等处理
  • 6、源码地址

1、基础整合

在SpringBoot启动时,Producer就会向RocketMQ注册所有的topic信息,并且topic名称不能重复。
一个监听器只能监听一个topic。

1.1.POM

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>2.2.1</version></dependency>

1.2.yaml配置

# rocketmq配置rocketmq:  #rocketmq服务地址集群由`;`分开  name-server: http://162.14.119.135:9876  #自定义的组名称  producer:   group: producer_test   #消息发送超时时长   send-message-timeout: 5000

1.3.Producer代码

@Service@Slf4jpublic class TestProducer {@ResourcerocketmqTemplate rocketmqTemplate;    public void send() { String text = "测试发送"; Message<String> message = MessageBuilder.withPayload(text).build(); log.info("开始发送..."); rocketMQTemplate.send("test_topic", message); log.info("已发送...");    }}

1.4.Consumer代码

@Component@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group")@Slf4jpublic class TestConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("TestConsumer - 接受到消息:" + message);    }}

1.5.代码测试

通过Controller接口调用Producer发送消息到队列中,Consumer通过监听器监听是否有消息,如果有则获取成功。
RocketMQ(三)SpringBoot整合RocketMQ

1.6.topic与tag的指定

使用rocketmqTemplate发送消息时没有指定设置topic及tag的参数,而是由参数destination实现,调用send方法的源码如下:

/* @param destination formats: `topicName:tags`* @param message     {@link org.springframework.messaging.Message}*/public void send(D destination, Message<?> message) {this.doSend(destination, message);}private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) { if (destination != null && destination.length() >= 1) {     if (payloads != null && payloads.length >= 1) {  // 分别获取topic与tag  String[] tempArr = destination.split(":", 2);  String topic = tempArr[0];  String tags = "";  if (tempArr.length > 1) {      tags = tempArr[1];  }    }    ...     }     ... }

可以发现,destination参数,可以由:分开,第一个参数为topic,第二个参数为tag,默认可以不设置tag参数,tag的设置应该根据实际使用场景来绝对。

消费者监听器通过selectorExpression ="tagName"获取指定的TAG消息

// 需要获取多个tag时,使用||分隔:"a||b||c"@rocketmqMessageListener(selectorExpression ="tagName"...)

2、六大消息类型代码实现

2.1.基本消息样例

基本消息样例分为以下三重:

  • 同步消息:这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  • 异步消息:用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  • 单向发送消息:这种方式主要用在不关心发送结果的场景,例如日志发送。

BaseConsumer:

@Component@RocketMQMessageListener(selectorExpression = "", topic = "base_topic", consumerGroup = "base_group")@Slf4jpublic class BaseConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("基本信息案例-接受到消息:" + message);    }}

BaseProducer:

@Service@Slf4jpublic class BaseProducer { @Resource    RocketMQTemplate rocketMQTemplate;    /     * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。     */    public void sync() { String text = "基本信息案例-同步发送" + System.currentTimeMillis(); log.info(text); rocketMQTemplate.syncSend("base_topic", text); log.info("同步发送-已发送...");    }    /     * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。     */    public void async() { String text = "基本信息案例-异步发送" + System.currentTimeMillis(); log.info(text); for (int a = 1; a <= 10; a++) {     rocketMQTemplate.asyncSend("base_topic", text + ",ID:" + a, new SendCallback() {  // SendCallback接收异步返回结果的回调  // 成功发送  @Override  public void onSuccess(SendResult sendResult) {      log.info("异步发送 - 发送成功");  }  // 发送失败  @Override  public void onException(Throwable throwable) {      log.info("异步发送 - 发送失败");      throwable.printStackTrace();  }     }); } log.info("异步发送-已发送...");    }    /     * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。     */    public void oneWay() { String text = "基本信息案例-单向发送" + System.currentTimeMillis(); log.info(text); rocketMQTemplate.sendOneWay("base_topic", text); log.info("单向发送-已发送...");    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 基本信息案例     */    @Resource    private BaseProducer baseProducer; @GetMapping("/base")    public Object base() { // 同步发送 baseProducer.sync(); // 异步发送 baseProducer.async(); // 单向发送 baseProducer.oneWay(); return "基本消息样例";    }}

效果:
RocketMQ(三)SpringBoot整合RocketMQ

2.2.顺序消息样例

消息有序指的是可以按照消息的发送顺序来消费(FIFO),rocketmq可以严格的保证消息有序,可以分为分区有序或者全局有序。

​ 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

​ 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号(topic)相同的消息会被先后发送到同一个队列中,消费时,同一个订单号获取到的肯定是同一个队列。


OrderConsumer:

@Component@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group", consumeMode = ConsumeMode.ORDERLY)@Slf4jpublic class OrderConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("顺序消息生产-接受到消息:" + message);    }}

OrderProducer:

@Service@Slf4jpublic class OrderProducer {    @Resource    RocketMQTemplate rocketMQTemplate;    public void order() { log.info("顺序消息"); try {     for (int i = 1; i <= 10; i++) {  int num = (int) (Math.random() * 10000);  // 设置一个延时,表示同一个消息先后进入到队形中  TimeUnit.MILLISECONDS.sleep(50);  log.info("顺序消息,ID:" + num);  // 第一个参数为topic,第二个参数为内容,第三个参数为Hash值,不同hash值在不同的队列中  rocketMQTemplate.syncSendOrderly("order_topic", "顺序消息,ID:" + num, "order");     }     log.info("已发送..."); } catch (Exception e) {     e.printStackTrace(); }    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 顺序消息发送样例     */    @Resource    private OrderProducer orderProducer; @GetMapping("/order")    public Object order() { orderProducer.order(); return "发送顺序消息";    }}

2.3.延时消息样例

通过设置延时等级,实现消费者延时消费数据,比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列。

// 固定延时,设置参数时,对应数值为:1~18private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

延时原理:

​ Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;

若有则需要经历一个复杂的过程:修改消息、投递延时消息、将消息重新写入commitlog

  • 修改消息

    • 修改消息的Topic为SCHEDULE_TOPIC_XXXX

    • 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件。

      延迟等级delayLevel与queueId的对应关系为:queueId = delayLevel -1

    • 修改消息索引单元内容。将MessageTagHashCode中原本存放消息的Tag的Hash值,现修改为消息的投递时间

      投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。

      投递时间 = 消息存储时间 + 延时等级时间。
      消息存储时间指的是消息被发送到Broker时的时间戳。

    • 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

  • 投递延时消息

Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的==投递时间,将延时消息投递到⽬标Topic中。==不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息,然后再次将消息投递到目标Topic中。

  • 消息重新写入commitlog

    ​ 延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。

    这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。


ScheduleConsumer:

@Component@RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group")@Slf4jpublic class ScheduleConsumer implements RocketMQListener<String> {    /     * 测试接收将参数topic定死,实际开发写入到配置文件     * @param message     */    @Override    public void onMessage(String message) { log.info("延时消息-接受到消息:" + message);    }}

ScheduledProducer:

@Service@Slf4jpublic class ScheduledProducer {   /     * 测试发送将参数topic定死,实际开发写入到配置文件     */    @Resource    RocketMQTemplate rocketMQTemplate;    public void scheduled() { String text = "延时消息"+ System.currentTimeMillis(); log.info(text); // 设置延时等级2,这个消息将在5s之后发送 // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Message<String> message = MessageBuilder.withPayload(text).build(); rocketMQTemplate.syncSend("scheduled_topic", message, 1000, 2); log.info("已发送...");    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 延时消息     */    @Resource    private ScheduledProducer scheduledProducer; @GetMapping("/scheduled")    public Object scheduled() { scheduledProducer.scheduled(); return "发送延时消息";    }}

效果:
RocketMQ(三)SpringBoot整合RocketMQ

2.4.标签过滤消息样例

一个应用尽可能用一个Topic,消息子类型用tag来标识,tag可以由应用自由设置。 在使用rocketmqTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName:前面表示topic的名称,后面表示tag名称。


TagConsumer:

@Component@RocketMQMessageListener(selectorExpression = "TAG-A||TAG-B", topic = "tag_topic", consumerGroup = "tag_group")@Slf4jpublic class TagConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("标签过滤消息-接受到消息:" + message);    }}

TagProducer:

@Service@Slf4jpublic class TagProducer {    @Resource    RocketMQTemplate rocketMQTemplate;    public void tag() { String text = "标签过滤消息," + System.currentTimeMillis(); log.info(text); // 任何类型的send方法均可以指定TAG,默认可以不指定则为* Message<String> message = MessageBuilder.withPayload(text).build(); rocketMQTemplate.syncSend("tag_topic:TAG-A", message); log.info("已发送...");    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 标签过虑消息     */    @Resource    private TagProducer tagProducer; @GetMapping("/tag")    public Object tag() { // TAG过滤 tagProducer.tag(); return "指定标签消息";    }}

2.5.SQL92过滤消息样例

TAG过滤消息只能有一个标签,这对于复杂的场景可能不起作用,通过SQL表达式筛选消息可以实现复杂的情况。

默认情况下,rocketmq不支持SQL92过滤形式,需要对broker进行配置

# 配置vim /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf# 在配置文件最后加上,并且需要重启enablePropertyFilter=true# 进入目录,进行重启操作cd /opt/rocketmq/rocketmq-4.9.2# 关闭sh bin/mqshutdown broker# 启动Broker,-n 指向NameServer地址,多个由`;`分开,也可以在配置文件中设置nohup sh bin/mqbroker -n 162.14.119.135:9876 -c /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf autoCreateTopicEnable=true &>/opt/rocketmq/rocketmq-4.9.2/logs/broker.log 2>&1 &

rocketmq执行一次SQL的基础语法:
语法特性:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,4.1415;

  • 字符,比如:‘abc’,必须用单引号包裹起来;

  • NULL,特殊的常量

  • 布尔值,TRUEFALSE

使用:
通过putUserProperty(key,value)指定一个参数的值(putUserProperty可以存在多个)
Consumer通过SQL语法来筛选是否满足设置的参数条件,如果满足则消费消息


SQLConsumer:

@Component@RocketMQMessageListener(selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 6 or b > 8", topic = "sql_topic", consumerGroup = "sql_group")@Slf4jpublic class SQLConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("SQL92过滤消息-接受到消息:" + message);    }}

SQLProducer:

@Service@Slf4jpublic class SQLProducer {    @Resource    RocketMQTemplate rocketMQTemplate;    /     * SQL92过滤消息     */    public void selector() { String text = "SQL92过滤消息" + System.currentTimeMillis(); log.info(text); Message<String> message = MessageBuilder.withPayload(text).build(); // 设置参数 Map<String, Object> map = new HashMap<>(4); map.put("a", 2); map.put("b", 10); rocketMQTemplate.convertAndSend("sql_topic", message, map); log.info("已发送...");    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * SQL92过滤消息     */    @Resource    private SQLProducer SQLProducer; @GetMapping("/selector")    public Object selector() { // SQL92过滤 SQLProducer.selector(); return "过滤消息样例";    }}

2.6.批量消息样例

生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。

发送限制:

  • 批量发送的消息必须具有相同的Topic。
  • 批量发送的消息必须具有相同的刷盘策略。
  • 批量发送的消息不能是延时消息与事务消息。
  • 消息的总大小不应超过4MB。

发送大小:

默认情况下,一批发送的消息总大小不能超过4MB字节,如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送,一般不大于1M。
  • 方案二:在Producer端与Broker端修改属性。
    Producer端需要在发送之前设置Producer的maxMessageSize属性
    Broker端需要修改其加载的配置文件中的maxMessageSize属性

BatchConsumer:

@Component@RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group")@Slf4jpublic class BatchConsumer implements RocketMQListener<String> {    /     *     * @param message     */    @Override    public void onMessage(String message) { log.info("批量消息-接受到消息:" + message);    }}

MessageSplitter:

import org.springframework.messaging.Message;import java.util.Iterator;import java.util.List;public class MessageSplitter implements Iterator<List<Message>> {    /     * 分割数据大小     */    private final int sizeLimit = 1024 * 1024;    ;    /     * 分割数据列表     */    private final List<Message> messages;    /     * 分割索引     */    private int currIndex;    public MessageSplitter(List<Message> messages) { this.messages = messages; // 保证单条数据的大小不大于sizeLimit messages.forEach(m -> {     if (m.toString().length() > sizeLimit) {  throw new RuntimeException("单挑消息不能大于" + sizeLimit + "B");     } });    }    @Override    public boolean hasNext() { return currIndex < messages.size();    }    @Override    public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) {     Message t = messages.get(nextIndex);     totalSize = totalSize + t.toString().length();     if (totalSize > sizeLimit) {  break;     } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList;    }}

BatchProducer:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.ArrayList;import java.util.List;@Service@Slf4jpublic class BatchProducer {    /     * 测试发送将参数topic定死,实际开发写入到配置文件     */    @Resource    RocketMQTemplate rocketMQTemplate;    public void batch() { String text = "批量消息"; log.info(text); List<Message> messageList = new ArrayList<>(); for (int i = 1; i <= 10; i++) {     messageList.add(MessageBuilder.withPayload(text + "--" + i).build()); } log.info("开始发送..."); //把大的消息分裂成若干个小的消息 MessageSplitter splitter = new MessageSplitter(messageList); while (splitter.hasNext()) {     List<Message> nextList = splitter.next();     SendResult result = rocketMQTemplate.syncSend("batch_topic", nextList);     if (result.getSendStatus() == SendStatus.SEND_OK) {  log.info("发送批量消息成功!消息ID为:{}", result.getMsgId());     } else {  log.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());     } } log.info("已发送...");    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 批量消息发送     */    @Resource    private BatchProducer batchProducer; @GetMapping("/batch")    public Object batch() { // 批量消息样例 batchProducer.batch(); return "批量消息样例";    }}

2.7.回馈消息样例

生产者通过sendAndReceive发送消息,消费者需要实现rocketmqReplyListener接口

如果连续通过sendAndReceive发送消息,生产者必须收到消费者的回复才能发送下一条消息。

ReplyConsumer:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQReplyListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "reply_topic", consumerGroup = "reply_group")@Slf4jpublic class ReplyConsumer implements RocketMQReplyListener<String, byte[]> {    @Override    public byte[] onMessage(String message) { log.info("接受到消息:" + message); // 返回消息到生成者 return "返回消息到生产者".getBytes();    }}

ReplyProducer:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class ReplyProducer {    @Resource    RocketMQTemplate rocketMQTemplate;    public void reply() { // 如果消费者没有回馈消息,则不会发送下一条消息 for (int i = 1; i <= 10; i++) {     String text = "回馈消息" + "--" + i;     log.info("发送" + text);     Object obj = rocketMQTemplate.sendAndReceive("reply_topic", text, String.class);     log.info("消费者返回的消息:" + obj); }    }}

Controller:

@RestController@RequestMapping("test")public class TestController {    /     * 回馈消息样例     */    @Resource    private ReplyProducer replyProducer; @GetMapping("/reply")    public Object reply() { // 消息事务 replyProducer.reply(); return "回馈消息样例";    }}

效果:
RocketMQ(三)SpringBoot整合RocketMQ

3、超时设置与MessageEx

消费者监听器需要实现rocketmqListener 接口,通过传入的泛型为String,则可以直接获取生产者传递的消息,如果想要获取消息的额外详情信息,需要传入泛型MessageEx

@Component@rocketmqMessageListener(topic = "topicName", consumerGroup = "groupName")public class ExConsumer implements rocketmqListener<MessageExt> {    @Override    public void onMessage(MessageExt message) {    }}

发送消息时可以设置超时时间,通过配置rocketmq.producer.send-message-timeout =实现全局消息超时设置,也可以对每个发送的消息配置单独的超时时间,比如:

rocketmqTemplate.syncSend("topic", message, 1000);

2、在SpringBoot整合中,一个监听器只能监听一个topic,并且topic不能重复。

4、消息发送重试机制

Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。

对于消息重投,需要注意以下几点:

  • 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的。
  • 只有普通消息具有发送重试机制,顺序消息是没有的。
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在rocketmq中是无法避免的问题。
  • 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略

5、消息幂等处理

由于某种原因(比如:网络),导致MQ不知道消息已经被消费,再次将该消息分发给其他的消费者。(因为消息重试等机制的原因,如果一个consumer断了,rocketmq有consumer集群,会将该消息重新发给其他consumer)。

因此为了防止重复消费,需要进行幂等处理,方案如下:

1、 将已经处理的消息存入数据库中,每次处理前先进行查询操作,判断当前消息是否成功处理。

2、 将已经处理的消息存入redis库中,每次处理前先进行查询操作,判断当前消息是否成功处理。

6、源码地址

源码地址:https://gitee.com/lhzlx/spring-boot-rocket-mq-demo.git

妊娠纹产品大全