RocketMQ(九)RocketMQ分布式事务实现
目录
1、事务场景
场景:工行用户A向建行用户B转账1万元;
步骤:
- 1.工行系统发送一个给B增款1万元的同步消息M给Broker
- 2.消息被Broker成功接收后,向工行系统发送成功ACK
- 3.工行系统收到成功ACK后从用户A中扣款1万元
- 4.建行系统从Broker中获取到消息M
- 5.建行系统消费消息M,即向用户B中增加1万元
这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费;此时建行系统中用户B会增加了1万元,出现了数据不一致问题。
2、解决思路
解决思路: 让上述步骤的第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息,简单来说就是先预提交一个消息,当本地事务没有异常的情况下,再次发送一个确认消息;该思路即使用事务消息,这里要使用分布式事务解决方案;
使用事务消息来处理该需求场景过程:
-
事务管理器TM向事务协调器TC发起指令,开启全局事务
-
工行系统发一个给B增款1万元的事务消息M给TC
-
TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker,此时的建行系统是看不到Broker中的消息M的
-
Broker会将预提交执行结果Report给TC。
-
如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作
-
工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
-
TC收到预扣款执行结果后,会将结果上报给TM
-
TM会根据上报结果向TC发出不同的确认指令
- 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
- 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
- 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
-
TC在接收到指令后会向Broker与工行系统发出确认指令
- TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认;
- TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚;
3、事务消息概念
RocketMQ的事务消息,主要是通过消息的异步处理,可以保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,具体流程如下:
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
- RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
- RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
事务过程:
通过sendMessageInTransaction
方法将消息发送到broker
,并回调事务监听器的方法,此时消息为半消息状态,需要进行二次确认才能发送到队列并由消费者进行消费。
如果MQ收到的事务状态一直是UNKNOWN,那么将不断的向MQ发送方发起回调检查本地事务状态
,直到收Commit/Rollback状态的消息或者人工干预删除UNKNOWN状态的消息;
注意本地SQL事务一定要在MQ监听器的回调方法中执行
4、事务消息实现
4.1 代码说明:
核心类为TxConsumer
、TxProducer
、TxProducerListener
实现流程。TxProducer
调用sendMessageInTransaction
方法后进入到TxProducerListener
中
注意本地SQL事务一定要在MQ监听器的回调方法中执行
需要注意的是: 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的Producer
,需要通过注解@ExtRocketMQTemplateConfiguration
定义不同的RocketMQTemplate, ,比如:
// 定义RocketMQTemplate @ExtRocketMQTemplateConfiguration public class XXXRocketMQTemplate extends RocketMQTemplate { } // 消费者发送事务消息 // 引入自定义的RocketMQTemplate @Resource(name = "xxxRocketMQTemplate") RocketMQTemplate rocketMQTemplate; public void send() { Message<String> message = MessageBuilder.withPayload("text").build(); rocketMQTemplate.sendMessageInTransaction("topic", message, null); }
要使用事务消息,需要自定义消息监听器,并且与RocketMQTemplate
进行绑定:
// 监听器,默认的为rocketMQTemplate// 如果使用ExtRocketMQTemplateConfiguration是自定义了RocketMQTemplate,则需要绑定自定义的bean名称@RocketMQTransactionListener(rocketMQTemplateBeanName="txRocketMQTemplate")public class TxProducerListener implements RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { ..... } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { ..... }}
4.2 完整代码
TxConsumer:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "tx_topic", consumerGroup = "tx_group")@Slf4jpublic class TxConsumer implements RocketMQListener<String> { /** * * @param message */ @Override public void onMessage(String message) { log.info("消息事务-接受到消息:" + message); }}
TxRocketMQTemplate:
// 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的`Producer`// 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate@ExtRocketMQTemplateConfigurationpublic class TxRocketMQTemplate extends RocketMQTemplate {}
TxProducerListener:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import java.util.concurrent.ConcurrentHashMap;@Slf4j@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate")public class TxProducerListener implements RocketMQLocalTransactionListener { /** * 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败 */ private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>(); /** * 执行本地事务 * * @param msg * @param arg * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务 String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString(); log.info("消息事务id为:" + transId); // 状态为正在执行 transMap.put(transId, 1); try { // 本地SQL事务一定要在MQ监听器的回调方法中执行 log.info("正在执行本地事务"); // 模拟耗时操作估计出发mq回查操作:当RocketMQ长时间(1分钟)没有收到本地事务的返回结果 // TimeUnit.SECONDS.sleep(80); // 模拟业代码执行,比如模拟插入user数据到数据库中,并且失败的情况 // System.out.println(1 / 0); log.info("事务执行完成."); } catch (Exception e) { // 状态为执行失败 transMap.put(transId, 3); log.error("事务执行异常."); // 出现异常 // 如果不需要重试 则设置为:ROLLBACK // 如果需要检查事务重试,1分钟后发起检查 则设置为:UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } // 状态为执行成功 transMap.put(transId, 2); return RocketMQLocalTransactionState.COMMIT; } /** * 事务超时,回查方法 * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。 * * @param msg * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { //根据transaction的id回查该事务的状态,并返回给消息队列 //未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态 //正确提交返回LocalTransactionState.COMMIT_MESSAGE //事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Integer status = transMap.get(transId); // 执行状态 1-正在执行,2-执行成功,3-执行失败 log.info("回查的事务id为:" + transId + ",当前的状态为:" + status); //正在执行 if (status == 1) { log.info("回查结果为:正在执行状态"); return RocketMQLocalTransactionState.UNKNOWN; } else if (status == 2) { //执行成功,返回commit log.info("回查结果为:成功状态"); transMap.remove(transId); return RocketMQLocalTransactionState.COMMIT; } else if (status == 3) { //执行失败,返回rollback log.info("回查结果为:失败状态"); return RocketMQLocalTransactionState.ROLLBACK; // 通过伪代码表示 检查本地事务执行情况 // User user = selectByUserId(userId); // if (user!=null) { // //插入成功(本地事务完成) // transMap.remove(transId); // return RocketMQLocalTransactionState.COMMIT; // } else { // // 插入失败 // // 如果不需要再重试 则设置为:ROLLBACK // // 如果还需要检查事务重试 则设置为:UNKNOWN // return RocketMQLocalTransactionState.UNKNOWN; // } } // 其他未知情况,统一返回不重试,删除消息 transMap.remove(transId); return RocketMQLocalTransactionState.ROLLBACK; }}
TxProducer:
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.UUID;@Service@Slf4jpublic class TxProducer { /** * 一个RocketMQTemplate只能注册一个事务监听器, * 如果存在多个事务监听器监听不同的`Producer`, * 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate */ @Resource(name = "txRocketMQTemplate") RocketMQTemplate rocketMQTemplate; public void tx() { String text = "消息事务发送" + System.currentTimeMillis(); log.info(text); UUID transactionId = UUID.randomUUID(); log.info("事务ID:" + transactionId); Message<String> message = MessageBuilder.withPayload(text) // 设置事务Id .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .build(); // 调研sendMessageInTransaction后进行到监听器中 rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null); log.info("已发送..."); }}