-
Notifications
You must be signed in to change notification settings - Fork 38
Description
分布式事务概述
分布式事务本质上是一次业务请求,操作了不同库的不同数据。而在业务上是要求此次业务请求最终的结果是各数据库的数据是可对帐且一致的。
分布式事务解决方案有符合XA规范的2PC、3PC,最终一致性方案TCC、Saga、Seata及MQ事务消息;XA方案能最大程度保证一致性但性能开销大;TCC等方案对业务侵入深,对开发不友好,RTT损耗约为10%;本设计最终落地选择MQ事务消息,其对业务侵入相对较小、保证吞吐的同时最大程度保证数据最终一致。
MQ事务消息基于两个前提:
- MQ保证消息不丢失且至少消费一次(At Least Once)
可选RabbitMQ、RocketMQ、Kafka
- 消费方要确保幂等
Redis保证幂等防重入
阿里云RocketMQ支持事务消息,但本组件并不采用事务消息。如下图所示为阿里云事务消息流程。
RocketMQ事务消息需要先发送半消息,发送成功才能执行本地事务,半消息投递成功还需要提供callback供broker回查本地事务执行状态。然而实际生产中这样操作并不一定合理。
-
投递MQ成功与否并不能决定是否执行本地事务
-
如果MQ能保证投递的消息不丢失且至少消费一次,提供回查接口其实增加了业务复杂度
MQ事务设计
基于以上归纳,以用户购买储值卡后,需要更新积分、帐户余额、更改会员等级为例(单体应用时,下单与这些操作属于同一事务),选型RocketMQ提出以下事务消息设计方案:
下单事务逻辑包括:(下单业务逻辑+事务消息处理) => local transaction
伪代码如下:
tx = db.beginTransaction()
try {
addOrder(ctx,params)
//事务消息
addTransactionMsg(ctx,topic,body,tags)
tx.Commit()
} catch {
tx.Rollback()
}
- 下单业务逻辑
根据业务逻辑生成主订单、子订单(如果有)
- 事务消息处理
1、生成一条事务消息,字段如下:
字段 类型 描述 id uint64 自增ID msgid int64 消息ID ,此ID可用作投递至MQ中的消息的Key,MQ生成的MsgId并不一唯一,业务方应以自己的Key为准 body varchar 消息体 topic varchar topic tags varchar tags,需要投递的tag is_delivery uint32 是否已投递,0未投递 1已投递 retry_times uint32 已重试次数 max_retry_times uint32 最大重试次数 created_time uint64 创建时间 updated_time uint64 更新时间
2、根据步骤1的事务消息生成需要投递的MQ消息并先投递到本地发送队列中
将步骤1中的消息加入本地发送队列,有异步线程不断消费该队列投递至RocketMQ
如果投递失败,将失败的消息加入本地延迟队列中(根据失败次数决定重试时间点),有异步线程不断拿出该发送的消息投至本地发送队列
注意: 以上两个操作是包含在一个本地事务中,要么成功返回下单成功,要么失败返回下单失败。
- MQ事务组件内部功能
- 定期清理已投递的消息(硬删即可,事务消息表要保证轻量)
- 同时对于超出最大重试次数的消息进行报警,通知人工尽快介入异常业务事务处理
- 记录所有调用日志,方便回查问题
服务协调
-
积分服务幂等消费订单创建消息
-
帐户余额服务幂等消费订单创建消息
-
用户会员等级变更服务幂等消费订单创建消息
结论
该方案对业务侵入小,对于需要强一致性的业务,直接在正常的事务业务代码中调用addTransactionMsg方法即可(具体实现逻辑全部屏蔽在组件层),保证业务吞吐的同时,性能损耗非常小。
