DataStore事务概述
数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。
数据库事务拥有以下四个特性,习惯上被称之为ACID特性。
- 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
- 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
- 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
- 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。
DataStore事务是基于Spring的AOP机制,使用者在期望开启事务的方法实现上通过注解的方式进行声明,事务AOP中将会做事务的开启和提交操作。DS事务与数据库事务的区别点在于DS事务可以支持跨线程(Trantor LF)甚至跨JVM的事务传播,DS会保证事务内调用DataStore的DML操作要么全部成功,要么全部失败。
DataStore会自己维护数据库连接以及跨线程/进程事务上下文,不会使用Spring的TransactionManager。
1. 使用方式
@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface DSTransaction { /** * 默认超时时间50s: 50000,单位:毫秒 */ int DEFAULT_TRANSACTION_TIMEOUT_MILLISECONDS = 50000; // 单元测试时可以调小,比如:1000
/** * 默认50s 单位ms * @return */ int timeout() default DEFAULT_TRANSACTION_TIMEOUT_MILLISECONDS;}示例1,通过模型DAO操作数据,声明开启事务,不指定超时时间(默认50秒)
class UserFacade { @Resource private UserDAO userDAO;
@DSTransaction public Response register(String mobile,String password){ val user = this.userDAO.queryByMobile(mobile); if(user == null){ // create user... // notify other // create other table } ... }}示例2,通过模型DAO操作数据,并指定超时时间为60秒
class UserFacade { @Resource private UserDAO userDAO;
@DSTransaction(timeout = 60000) public Response register(String mobile,String password){ val user = this.userDAO.queryByMobile(mobile); if(user == null){ // create user... // notify other // create other table } ... }}示例3,Flow中声明事务
Flow接口定义
/** * 删除活动flow * */@Flow(name = "delete campaign")public interface DeleteCampaignFlow { void execute(CampaignBO campaignBO);}Flow实现
@FlowImpl(name = "Delete Campaign Flow Impl")@RequiredArgsConstructorpublic class DeleteCampaignFlowImpl implements DeleteCampaignFlow { private final DeleteCampaignFunc deleteCampaignFunc; private final ValidateCampaignCanDeleteFunc validateCampaignCanDeleteFunc; private final ValidateCampaignExistFunc validateCampaignExistFunc;
@DSTransaction @Override public void execute(CampaignBO campaignBO) { final CampaignBO dbCampaign = validateCampaignExistFunc.execute(campaignBO); validateCampaignCanDeleteFunc.execute(dbCampaign); deleteCampaignFunc.execute(campaignBO); }}示例4,Function中声明事务
Function定义
/** * 删除活动 * */@Function(name = "Delete Campaign Func")public interface DeleteCampaignFunc {
/** * 删除活动 * * @param campaign 活动 */ void execute(CampaignBO campaign);}Function实现
@FunctionImpl(name = "Delete Campaign Func Impl")public class DeleteCampaignFuncImpl implements DeleteCampaignFunc {
@DSTransaction @Override public void execute(CampaignBO campaign) { DS.delete(campaign); if (campaign.getCampaignFlowDefinition() != null) { DS.delete(campaign.getCampaignFlowDefinition()); } }}2. 事务传播
DS支持显式或者隐式的事务传播,包含进程内以及跨JVM两种
2.1 进程内(Trantor所有版本)
@Servicepublic class A { @Autowired private FooModelDAO fooDAO; @Autowired private BarModelDAO barDAO; @Autowired private B b;
@DSTransaction(timeout = 50000) public Result doSomething(String someArgs){ // 调用DAO的方法访问DS fooDAO.create(someArgs); barDAO.update(someArgs); // 调用其他事务方法 b.doSomething(someArgs); ... }}
@Servicepublic class B { @Autowired private FooooModelDAO fooDAO; @Autowired private BarrrrModelDAO barDAO;
@DSTransaction(timeout = 30000) public Result doSomething(String someArgs){ // 调用DAO的方法访问DS fooDAO.create(someArgs); barDAO.update(someArgs); ... }}调用A.doSomething(“xxx”)时,首先A Service中的doSomething方法内的逻辑在一个DS事务内,这个是显而易见的,但是存在一个b.doSomething的调用,这个方法内的所有请求会复用A中的事务,因此从逻辑上事务传播到了B的doSomething方法,无论B的doSomething是否声明了@DSTransaction,只要上层调用(这里的调用指的是同步调用或者使用LF Api的异步调用)当前方法的方法开启了事务,当前方法逻辑都会嵌套在外层事务中。
但是这并不意味着B的doSomething不需要主动声明事务,因为存在另一种场景,假如有一个C服务,也调用了B的doSomething
@Servicepublic class C { @Autowired private FooModelDAO fooDAO; @Autowired private BarModelDAO barDAO; @Autowired private B b;
public Result doSomething(String someArgs){ // 调用DAO的方法访问DS fooDAO.create(someArgs); barDAO.update(someArgs); // 调用其他事务方法 b.doSomething(someArgs); ... }}
@Servicepublic class B { @Autowired private FooooModelDAO fooDAO; @Autowired private BarrrrModelDAO barDAO;
public Result doSomething(String someArgs){ // 调用DAO的方法访问DS fooDAO.create(someArgs); barDAO.update(someArgs); ... }}由于B没有开启事务,然后上层调用方C的doSomething也没有开启事务,因此B的doSomething方法逻辑不具备事务性,像这种具有隐藏含义的方法声明有可能不符合预期(A调用时候在一个事务内,C调用时候不在事务内)
2.2 跨JVM(Trantor 0.16)
交易模块的支付回执Flow
public class CallbackAfterPaidFlowImpl implements CallbackAfterPaidFlow { private final QueryAndValidateTradeContractDetailByIdFunc queryAndValidateTradeContractDetailByIdFunc; private final ValidateIfTradeContractCanUpdateToPaidFunc validateIfTradeContractCanUpdateToPaidFunc; private final CalculateTradeContractAndContractLinePayStatusAndTimeFunc calculateTradeContractAndContractLinePayStatusAndTimeFunc; private final CalculateTradeOrderAndOrderLinePayStatusAndTimeFunc calculateTradeOrderAndOrderLinePayStatusAndTimeFunc; private final CalculateTradeContractAndContractLinePaidAmtFunc calculateTradeContractAndContractLinePaidAmtFunc; private final CalculateTradeOrderAndOrderLinePaidAmtFunc calculateTradeOrderAndOrderLinePaidAmtFunc; private final UpdateTradeContractAndLinkModelFunc updateTradeContractAndLinkModelFunc; private final ConvertPaymentOrderToTradeContractQueryFunc convertPaymentOrderToTradeContractQueryFunc; private final ConfirmOccupyInventoryFlow confirmOccupyInventoryFlow; private final AssembleOccupyConfirmRequestFunc assembleOccupyConfirmRequestFunc; private final AssemblePaymentOrderWithTradeContractFunc assemblePaymentOrderWithTradeContractFunc; private final UpdateTradeOrderSOTimelineAndStatusFunc updateTradeOrderSOTimelineAndStatusFunc; private final SendNoticeToSellerForShippingJobFunc sendNoticeToSellerForShippingJobFunc; private final FilterContractLineForBundleSkuFunc filterContractLineForBundleSkuFunc; private final FilterOrderLineForBundleSkuFunc filterOrderLineForBundleSkuFunc;
@DSTransaction public void execute(PaymentOrderBO currentPaymentOrderBO) { QTradeContractBO tradeContractToQuery = this.convertPaymentOrderToTradeContractQueryFunc.execute(currentPaymentOrderBO); TradeContractBO originTradeContractBO = this.queryAndValidateTradeContractDetailByIdFunc.execute(tradeContractToQuery); TradeContractBO filterBundleContractLine = this.filterContractLineForBundleSkuFunc.execute(originTradeContractBO); TradeContractBO filterBundleOrderLine = this.filterOrderLineForBundleSkuFunc.execute(filterBundleContractLine); this.validateIfTradeContractCanUpdateToPaidFunc.execute(filterBundleOrderLine, currentPaymentOrderBO); TradeContractBO calculatedContractPayStatusTradeContractBO = this.calculateTradeContractAndContractLinePayStatusAndTimeFunc.execute(originTradeContractBO, currentPaymentOrderBO); TradeContractBO calculatedOrderPayStatusTradeContractBO = this.calculateTradeOrderAndOrderLinePayStatusAndTimeFunc.execute(calculatedContractPayStatusTradeContractBO, currentPaymentOrderBO); TradeContractBO calculatedContractPaidAmtTradeContractBO = this.calculateTradeContractAndContractLinePaidAmtFunc.execute(calculatedOrderPayStatusTradeContractBO, currentPaymentOrderBO); TradeContractBO calculatedOrderPaidAmtTradeContractBO = this.calculateTradeOrderAndOrderLinePaidAmtFunc.execute(calculatedContractPaidAmtTradeContractBO, currentPaymentOrderBO); TradeContractBO assembledTradeContract = this.assemblePaymentOrderWithTradeContractFunc.execute(currentPaymentOrderBO, calculatedOrderPaidAmtTradeContractBO); List<TradeOrderSO> existedTradeOrderSOList = DS.findAll(TradeOrderSO.class, "*", "tradeContractId = ?", new Object[]{assembledTradeContract.getId()}); this.updateTradeContractAndLinkModelFunc.execute(assembledTradeContract); this.updateTradeOrderSOTimelineAndStatusFunc.execute(assembledTradeContract.getTradeOrderList(), existedTradeOrderSOList); List<InventoryLineBO> inventoryLineBOS = this.assembleOccupyConfirmRequestFunc.execute(calculatedOrderPaidAmtTradeContractBO); this.confirmOccupyInventoryFlow.execute(inventoryLineBOS); LF.foreach(assembledTradeContract.getTradeOrderList(), (tradeOrderBO) -> { this.sendNoticeToSellerForShippingJobFunc.execute(tradeOrderBO); }); }在这个Flow中,交易模块会调用跨JVM的库存模块的Flow,由交易模块发起的请求开启了事务,这个事务会传播到库存模块,如果库存模块执行Flow时候抛出了异常,交易模块的Flow中DB相关逻辑也会回滚
3. 注意事项
3.1 传播级别
目前只支持PROPAGATION_REQUIRED级别的事务传播机制,即支持当前事务,如果当前没有事务,则新建一个事务,这是最常见的选择,也是 Spring 默认的一个事务传播属性。
3.2 事务影响范围
DS只能保证事务内的通过GQL/TSQL执行的db操作具有事务性,业务逻辑内其他redis、mq、异步调用无法具备事务性,需要使用者自己确保一致性。
3.3 数据变更消息
DS提供了数据变更消息监听机制,当模型数据发生了变更,DS将发送模型变更消息,由于一个事务内可能有N次db操作,DS会确保只有事务提交时才会把所有的变更消息发送出去,在事务还未提交时不会发送消息。
3.4 跨库写
Trantor 0.16开始提供了跨库写功能,事务内出现跨库操作也可以保证事务性,0.16之前版本不支持跨库写,只支持跨库读。