跳转到内容

DataStore事务概述

数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。

数据库事务拥有以下四个特性,习惯上被称之为ACID特性。

  1. 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
  2. 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
  3. 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
  4. 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。

DataStore事务是基于Spring的AOP机制,使用者在期望开启事务的方法实现上通过注解的方式进行声明,事务AOP中将会做事务的开启和提交操作。DS事务与数据库事务的区别点在于DS事务可以支持跨线程(Trantor LF)甚至跨JVM的事务传播,DS会保证事务内调用DataStore的DML操作要么全部成功,要么全部失败。

DataStore会自己维护数据库连接以及跨线程/进程事务上下文,不会使用Spring的TransactionManager。

1. 使用方式

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface DSTransaction {
/**
* 默认超时时间50s: 50000,单位:毫秒
*/
int DEFAULT_TRANSACTION_TIMEOUT_MILLISECONDS = 50000; // 单元测试时可以调小,比如:1000
/**
* 默认50s 单位ms
* @return
*/
int timeout() default DEFAULT_TRANSACTION_TIMEOUT_MILLISECONDS;
}

示例1,通过模型DAO操作数据,声明开启事务,不指定超时时间(默认50秒)

class UserFacade {
@Resource
private UserDAO userDAO;
@DSTransaction
public Response register(String mobile,String password){
val user = this.userDAO.queryByMobile(mobile);
if(user == null){
// create user...
// notify other
// create other table
}
...
}
}

示例2,通过模型DAO操作数据,并指定超时时间为60秒

class UserFacade {
@Resource
private UserDAO userDAO;
@DSTransaction(timeout = 60000)
public Response register(String mobile,String password){
val user = this.userDAO.queryByMobile(mobile);
if(user == null){
// create user...
// notify other
// create other table
}
...
}
}

示例3,Flow中声明事务

Flow接口定义

/**
* 删除活动flow
*
*/
@Flow(name = "delete campaign")
public interface DeleteCampaignFlow {
void execute(CampaignBO campaignBO);
}

Flow实现

@FlowImpl(name = "Delete Campaign Flow Impl")
@RequiredArgsConstructor
public class DeleteCampaignFlowImpl implements DeleteCampaignFlow {
private final DeleteCampaignFunc deleteCampaignFunc;
private final ValidateCampaignCanDeleteFunc validateCampaignCanDeleteFunc;
private final ValidateCampaignExistFunc validateCampaignExistFunc;
@DSTransaction
@Override
public void execute(CampaignBO campaignBO) {
final CampaignBO dbCampaign = validateCampaignExistFunc.execute(campaignBO);
validateCampaignCanDeleteFunc.execute(dbCampaign);
deleteCampaignFunc.execute(campaignBO);
}
}

示例4,Function中声明事务

Function定义

/**
* 删除活动
*
*/
@Function(name = "Delete Campaign Func")
public interface DeleteCampaignFunc {
/**
* 删除活动
*
* @param campaign 活动
*/
void execute(CampaignBO campaign);
}

Function实现

@FunctionImpl(name = "Delete Campaign Func Impl")
public class DeleteCampaignFuncImpl implements DeleteCampaignFunc {
@DSTransaction
@Override
public void execute(CampaignBO campaign) {
DS.delete(campaign);
if (campaign.getCampaignFlowDefinition() != null) {
DS.delete(campaign.getCampaignFlowDefinition());
}
}
}

2. 事务传播

DS支持显式或者隐式的事务传播,包含进程内以及跨JVM两种

2.1 进程内(Trantor所有版本)

@Service
public class A {
@Autowired
private FooModelDAO fooDAO;
@Autowired
private BarModelDAO barDAO;
@Autowired
private B b;
@DSTransaction(timeout = 50000)
public Result doSomething(String someArgs){
// 调用DAO的方法访问DS
fooDAO.create(someArgs);
barDAO.update(someArgs);
// 调用其他事务方法
b.doSomething(someArgs);
...
}
}
@Service
public class B {
@Autowired
private FooooModelDAO fooDAO;
@Autowired
private BarrrrModelDAO barDAO;
@DSTransaction(timeout = 30000)
public Result doSomething(String someArgs){
// 调用DAO的方法访问DS
fooDAO.create(someArgs);
barDAO.update(someArgs);
...
}
}

调用A.doSomething(“xxx”)时,首先A Service中的doSomething方法内的逻辑在一个DS事务内,这个是显而易见的,但是存在一个b.doSomething的调用,这个方法内的所有请求会复用A中的事务,因此从逻辑上事务传播到了B的doSomething方法,无论B的doSomething是否声明了@DSTransaction,只要上层调用(这里的调用指的是同步调用或者使用LF Api的异步调用)当前方法的方法开启了事务,当前方法逻辑都会嵌套在外层事务中。

但是这并不意味着B的doSomething不需要主动声明事务,因为存在另一种场景,假如有一个C服务,也调用了B的doSomething

@Service
public class C {
@Autowired
private FooModelDAO fooDAO;
@Autowired
private BarModelDAO barDAO;
@Autowired
private B b;
public Result doSomething(String someArgs){
// 调用DAO的方法访问DS
fooDAO.create(someArgs);
barDAO.update(someArgs);
// 调用其他事务方法
b.doSomething(someArgs);
...
}
}
@Service
public class B {
@Autowired
private FooooModelDAO fooDAO;
@Autowired
private BarrrrModelDAO barDAO;
public Result doSomething(String someArgs){
// 调用DAO的方法访问DS
fooDAO.create(someArgs);
barDAO.update(someArgs);
...
}
}

由于B没有开启事务,然后上层调用方C的doSomething也没有开启事务,因此B的doSomething方法逻辑不具备事务性,像这种具有隐藏含义的方法声明有可能不符合预期(A调用时候在一个事务内,C调用时候不在事务内)

2.2 跨JVM(Trantor 0.16)

交易模块的支付回执Flow

public class CallbackAfterPaidFlowImpl implements CallbackAfterPaidFlow {
private final QueryAndValidateTradeContractDetailByIdFunc queryAndValidateTradeContractDetailByIdFunc;
private final ValidateIfTradeContractCanUpdateToPaidFunc validateIfTradeContractCanUpdateToPaidFunc;
private final CalculateTradeContractAndContractLinePayStatusAndTimeFunc calculateTradeContractAndContractLinePayStatusAndTimeFunc;
private final CalculateTradeOrderAndOrderLinePayStatusAndTimeFunc calculateTradeOrderAndOrderLinePayStatusAndTimeFunc;
private final CalculateTradeContractAndContractLinePaidAmtFunc calculateTradeContractAndContractLinePaidAmtFunc;
private final CalculateTradeOrderAndOrderLinePaidAmtFunc calculateTradeOrderAndOrderLinePaidAmtFunc;
private final UpdateTradeContractAndLinkModelFunc updateTradeContractAndLinkModelFunc;
private final ConvertPaymentOrderToTradeContractQueryFunc convertPaymentOrderToTradeContractQueryFunc;
private final ConfirmOccupyInventoryFlow confirmOccupyInventoryFlow;
private final AssembleOccupyConfirmRequestFunc assembleOccupyConfirmRequestFunc;
private final AssemblePaymentOrderWithTradeContractFunc assemblePaymentOrderWithTradeContractFunc;
private final UpdateTradeOrderSOTimelineAndStatusFunc updateTradeOrderSOTimelineAndStatusFunc;
private final SendNoticeToSellerForShippingJobFunc sendNoticeToSellerForShippingJobFunc;
private final FilterContractLineForBundleSkuFunc filterContractLineForBundleSkuFunc;
private final FilterOrderLineForBundleSkuFunc filterOrderLineForBundleSkuFunc;
@DSTransaction
public void execute(PaymentOrderBO currentPaymentOrderBO) {
QTradeContractBO tradeContractToQuery = this.convertPaymentOrderToTradeContractQueryFunc.execute(currentPaymentOrderBO);
TradeContractBO originTradeContractBO = this.queryAndValidateTradeContractDetailByIdFunc.execute(tradeContractToQuery);
TradeContractBO filterBundleContractLine = this.filterContractLineForBundleSkuFunc.execute(originTradeContractBO);
TradeContractBO filterBundleOrderLine = this.filterOrderLineForBundleSkuFunc.execute(filterBundleContractLine);
this.validateIfTradeContractCanUpdateToPaidFunc.execute(filterBundleOrderLine, currentPaymentOrderBO);
TradeContractBO calculatedContractPayStatusTradeContractBO = this.calculateTradeContractAndContractLinePayStatusAndTimeFunc.execute(originTradeContractBO, currentPaymentOrderBO);
TradeContractBO calculatedOrderPayStatusTradeContractBO = this.calculateTradeOrderAndOrderLinePayStatusAndTimeFunc.execute(calculatedContractPayStatusTradeContractBO, currentPaymentOrderBO);
TradeContractBO calculatedContractPaidAmtTradeContractBO = this.calculateTradeContractAndContractLinePaidAmtFunc.execute(calculatedOrderPayStatusTradeContractBO, currentPaymentOrderBO);
TradeContractBO calculatedOrderPaidAmtTradeContractBO = this.calculateTradeOrderAndOrderLinePaidAmtFunc.execute(calculatedContractPaidAmtTradeContractBO, currentPaymentOrderBO);
TradeContractBO assembledTradeContract = this.assemblePaymentOrderWithTradeContractFunc.execute(currentPaymentOrderBO, calculatedOrderPaidAmtTradeContractBO);
List<TradeOrderSO> existedTradeOrderSOList = DS.findAll(TradeOrderSO.class, "*", "tradeContractId = ?", new Object[]{assembledTradeContract.getId()});
this.updateTradeContractAndLinkModelFunc.execute(assembledTradeContract);
this.updateTradeOrderSOTimelineAndStatusFunc.execute(assembledTradeContract.getTradeOrderList(), existedTradeOrderSOList);
List<InventoryLineBO> inventoryLineBOS = this.assembleOccupyConfirmRequestFunc.execute(calculatedOrderPaidAmtTradeContractBO);
this.confirmOccupyInventoryFlow.execute(inventoryLineBOS);
LF.foreach(assembledTradeContract.getTradeOrderList(), (tradeOrderBO) -> {
this.sendNoticeToSellerForShippingJobFunc.execute(tradeOrderBO);
});
}

在这个Flow中,交易模块会调用跨JVM的库存模块的Flow,由交易模块发起的请求开启了事务,这个事务会传播到库存模块,如果库存模块执行Flow时候抛出了异常,交易模块的Flow中DB相关逻辑也会回滚

3. 注意事项

3.1 传播级别

目前只支持PROPAGATION_REQUIRED级别的事务传播机制,即支持当前事务,如果当前没有事务,则新建一个事务,这是最常见的选择,也是 Spring 默认的一个事务传播属性。

3.2 事务影响范围

DS只能保证事务内的通过GQL/TSQL执行的db操作具有事务性,业务逻辑内其他redis、mq、异步调用无法具备事务性,需要使用者自己确保一致性。

3.3 数据变更消息

DS提供了数据变更消息监听机制,当模型数据发生了变更,DS将发送模型变更消息,由于一个事务内可能有N次db操作,DS会确保只有事务提交时才会把所有的变更消息发送出去,在事务还未提交时不会发送消息。

3.4 跨库写

Trantor 0.16开始提供了跨库写功能,事务内出现跨库操作也可以保证事务性,0.16之前版本不支持跨库写,只支持跨库读。