> 文档中心 > RocketMQ(九)RocketMQ分布式事务实现

RocketMQ(九)RocketMQ分布式事务实现

目录

  • 1、事务场景
  • 2、解决思路
  • 3、事务消息概念
  • 4、事务消息实现
    • 4.1 代码说明:
    • 4.2 完整代码

1、事务场景

场景:工行用户A向建行用户B转账1万元;
在这里插入图片描述
步骤:

  • 1.工行系统发送一个给B增款1万元的同步消息M给Broker
  • 2.消息被Broker成功接收后,向工行系统发送成功ACK
  • 3.工行系统收到成功ACK后从用户A中扣款1万元
  • 4.建行系统从Broker中获取到消息M
  • 5.建行系统消费消息M,即向用户B中增加1万元

这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费;此时建行系统中用户B会增加了1万元,出现了数据不一致问题。

2、解决思路

解决思路: 让上述步骤的第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息,简单来说就是先预提交一个消息,当本地事务没有异常的情况下,再次发送一个确认消息;该思路即使用事务消息,这里要使用分布式事务解决方案;
在这里插入图片描述
使用事务消息来处理该需求场景过程:

  1. 事务管理器TM向事务协调器TC发起指令,开启全局事务

  2. 工行系统发一个给B增款1万元的事务消息M给TC

  3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker,此时的建行系统是看不到Broker中的消息M的

  4. Broker会将预提交执行结果Report给TC。

  5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作

  6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态

  7. TC收到预扣款执行结果后,会将结果上报给TM

  8. TM会根据上报结果向TC发出不同的确认指令

    • 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
    • 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
    • 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
  9. TC在接收到指令后会向Broker与工行系统发出确认指令

    • TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认;
    • TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚;

3、事务消息概念

RocketMQ的事务消息,主要是通过消息的异步处理,可以保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,具体流程如下:
在这里插入图片描述
事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
  • RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。

事务过程:
通过sendMessageInTransaction方法将消息发送到broker,并回调事务监听器的方法,此时消息为半消息状态,需要进行二次确认才能发送到队列并由消费者进行消费。

如果MQ收到的事务状态一直是UNKNOWN,那么将不断的向MQ发送方发起回调检查本地事务状态,直到收Commit/Rollback状态的消息或者人工干预删除UNKNOWN状态的消息;

注意本地SQL事务一定要在MQ监听器的回调方法中执行

4、事务消息实现

4.1 代码说明:

核心类为TxConsumerTxProducerTxProducerListener实现流程。TxProducer调用sendMessageInTransaction方法后进入到TxProducerListener

注意本地SQL事务一定要在MQ监听器的回调方法中执行

需要注意的是: 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的Producer,需要通过注解@ExtRocketMQTemplateConfiguration定义不同的RocketMQTemplate, ,比如:

  // 定义RocketMQTemplate  @ExtRocketMQTemplateConfiguration  public class XXXRocketMQTemplate extends RocketMQTemplate {    }    // 消费者发送事务消息  // 引入自定义的RocketMQTemplate  @Resource(name = "xxxRocketMQTemplate")  RocketMQTemplate rocketMQTemplate;    public void send() {    Message<String> message = MessageBuilder.withPayload("text").build();    rocketMQTemplate.sendMessageInTransaction("topic", message, null);  }

要使用事务消息,需要自定义消息监听器,并且与RocketMQTemplate进行绑定:

// 监听器,默认的为rocketMQTemplate// 如果使用ExtRocketMQTemplateConfiguration是自定义了RocketMQTemplate,则需要绑定自定义的bean名称@RocketMQTransactionListener(rocketMQTemplateBeanName="txRocketMQTemplate")public class TxProducerListener implements RocketMQLocalTransactionListener {@Override    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { .....    }    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { .....    }}

4.2 完整代码

TxConsumer:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "tx_topic", consumerGroup = "tx_group")@Slf4jpublic class TxConsumer implements RocketMQListener<String> {    /**     *     * @param message     */    @Override    public void onMessage(String message) { log.info("消息事务-接受到消息:" + message);    }}

TxRocketMQTemplate:

// 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的`Producer`// 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate@ExtRocketMQTemplateConfigurationpublic class TxRocketMQTemplate extends RocketMQTemplate {}

TxProducerListener:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import java.util.concurrent.ConcurrentHashMap;@Slf4j@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate")public class TxProducerListener implements RocketMQLocalTransactionListener {    /**     * 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败     */    private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();    /**     * 执行本地事务     *     * @param msg     * @param arg     * @return     */    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务 String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString(); log.info("消息事务id为:" + transId); // 状态为正在执行 transMap.put(transId, 1); try {     // 本地SQL事务一定要在MQ监听器的回调方法中执行     log.info("正在执行本地事务");     // 模拟耗时操作估计出发mq回查操作:当RocketMQ长时间(1分钟)没有收到本地事务的返回结果     // TimeUnit.SECONDS.sleep(80);     // 模拟业代码执行,比如模拟插入user数据到数据库中,并且失败的情况     // System.out.println(1 / 0);     log.info("事务执行完成."); } catch (Exception e) {     // 状态为执行失败     transMap.put(transId, 3);     log.error("事务执行异常.");     // 出现异常     // 如果不需要重试 则设置为:ROLLBACK     // 如果需要检查事务重试,1分钟后发起检查 则设置为:UNKNOWN     return RocketMQLocalTransactionState.UNKNOWN; } // 状态为执行成功 transMap.put(transId, 2); return RocketMQLocalTransactionState.COMMIT;    }    /**     * 事务超时,回查方法     * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。     *     * @param msg     * @return     */    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { //根据transaction的id回查该事务的状态,并返回给消息队列 //未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态 //正确提交返回LocalTransactionState.COMMIT_MESSAGE //事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Integer status = transMap.get(transId); // 执行状态 1-正在执行,2-执行成功,3-执行失败 log.info("回查的事务id为:" + transId + ",当前的状态为:" + status); //正在执行 if (status == 1) {     log.info("回查结果为:正在执行状态");     return RocketMQLocalTransactionState.UNKNOWN; } else if (status == 2) {     //执行成功,返回commit     log.info("回查结果为:成功状态");     transMap.remove(transId);     return RocketMQLocalTransactionState.COMMIT; } else if (status == 3) {     //执行失败,返回rollback     log.info("回查结果为:失败状态");     return RocketMQLocalTransactionState.ROLLBACK;     // 通过伪代码表示 检查本地事务执行情况     // User user = selectByUserId(userId);     // if (user!=null) {     //     //插入成功(本地事务完成)     //     transMap.remove(transId);     //     return RocketMQLocalTransactionState.COMMIT;     // } else {     //      // 插入失败     //      // 如果不需要再重试 则设置为:ROLLBACK     //      // 如果还需要检查事务重试 则设置为:UNKNOWN     //     return RocketMQLocalTransactionState.UNKNOWN;     // } } //  其他未知情况,统一返回不重试,删除消息 transMap.remove(transId); return RocketMQLocalTransactionState.ROLLBACK;    }}

TxProducer:

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.UUID;@Service@Slf4jpublic class TxProducer {    /**     * 一个RocketMQTemplate只能注册一个事务监听器,     * 如果存在多个事务监听器监听不同的`Producer`,     * 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate     */    @Resource(name = "txRocketMQTemplate")    RocketMQTemplate rocketMQTemplate;    public void tx() { String text = "消息事务发送" + System.currentTimeMillis(); log.info(text); UUID transactionId = UUID.randomUUID(); log.info("事务ID:" + transactionId); Message<String> message = MessageBuilder.withPayload(text)  // 设置事务Id  .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)  .build(); // 调研sendMessageInTransaction后进行到监听器中 rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null); log.info("已发送...");    }}