跳转到内容

DS 模型数据变更消息订阅功能概述

模型数据变更是指当前模型产生了插入,更新或删除数据的操作。模型数据变更消息订阅主要提供了当被订阅的模型发生插入,更新或删除数据的操作时, DS会将该模型当前变更的全量数据发送到MQ中,供需要该场景的业务方消费,比如MetaStore的操作日志功能就是基于该方案实现,此外还有MetaStore的 触发器功能也是基于该方案。 下图为简易流程:

img.png

订阅模型,触发模型增删改,消费消息的动作均可以是不同的服务

1.订阅模型数据变更消息流程

下面先简单介绍下如果订阅模型,并消费模型数据变更消息,具体操作请参考 DS模型数据变更消息用户指南

1.1 如何订阅模型变更消息?

DS Server提供订阅模型/取消订阅模型的接口,所以直接调用DS的接口即可完成对模型的订阅/取消订阅功能,

  • 订阅 AutumnClient.subscribe(ModelSubscribeRequest),
  • 取消订阅 AutumnClient.unsubscribe(ModelUnsubscribeRequest) demo:
    // 订阅目标模型,以 base_User,base_Staff 模型为例
    ModelSubscribeRequest subscribeRequest = new ModelSubscribeRequest();
    // projectId来源于当前业务模块的DS_PROJECT_ID环境变量
    subscribeRequest.setProjectId("trantor_060");
    // 这里记录要订阅的模型名称以及相关事件类型,详情参考文档"发送原理"一节
    HashMap<String, String> subscribeModels = new HashMap<>();
    subscribeModels.put("base_User", "CUD");
    subscribeModels.put("base_Staff", "CUD");
    subscribeRequest.setSubscribeModels(subscribeModels);
    autumnClient.subscribe(subscribeRequest);
    // 取消订阅目标模型,以 base_User 模型为例
    ModelUnsubscribeRequest unSubscribeRequest = new ModelUnsubscribeRequest();
    // projectId来源于当前业务模块的DS_PROJECT_ID环境变量
    unSubscribeRequest.setProjectId("trantor_060");
    // 这里记录要取消订阅的模型名称
    HashSet<String> modelNames = new HashSet<>();
    modelNames.add("base_User");
    unSubscribeRequest.setModelNames(modelNames);
    autumnClient.unsubscribe(unSubscribeRequest);

1.2 如何消费被订阅的模型数据变更消息?

订阅模型变更消息简单来说是依赖DS提供的SDK,实现相关接口即可。

  • 依赖DS提供的MQ消费SDK
<dependency>
<groupId>io.terminus.platform</groupId>
<artifactId>autumn-data-subscribe</artifactId>
<version>5.2.0-SNAPSHOT</version>
</dependency>
  • 实现autumn-data-subscribe中的DataStoreSubscribeMsgConsumer接口
//监听base_User的Listener
@DataStoreEventListener(listenModels={"base_User"})
@Slf4j
public class BaseUserMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}
//监听base_Staff的Listener
@DataStoreEventListener(listenModels={"base_Staff"})
@Slf4j
public class BaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}
//或者在一个Listener中同时监听base_User和base_Staff,
@DataStoreEventListener(listenModels={"base_User","base_Staff"})
@Slf4j
public class BaseUserAndBaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}

@DataStoreEventListener注解配置当前Consumer订阅哪些模型的数据变更消息,默认为*,代表监听当前项目下所有被订阅的模型。