DS 模型数据变更消息订阅功能概述
模型数据变更是指当前模型产生了插入,更新或删除数据的操作。模型数据变更消息订阅主要提供了当被订阅的模型发生插入,更新或删除数据的操作时, DS会将该模型当前变更的全量数据发送到MQ中,供需要该场景的业务方消费,比如MetaStore的操作日志功能就是基于该方案实现,此外还有MetaStore的 触发器功能也是基于该方案。 下图为简易流程:

订阅模型,触发模型增删改,消费消息的动作均可以是不同的服务
1.订阅模型数据变更消息流程
下面先简单介绍下如果订阅模型,并消费模型数据变更消息,具体操作请参考 DS模型数据变更消息用户指南
1.1 如何订阅模型变更消息?
DS Server提供订阅模型/取消订阅模型的接口,所以直接调用DS的接口即可完成对模型的订阅/取消订阅功能,
- 订阅 AutumnClient.subscribe(ModelSubscribeRequest),
- 取消订阅 AutumnClient.unsubscribe(ModelUnsubscribeRequest)
demo:
// 订阅目标模型,以 base_User,base_Staff 模型为例ModelSubscribeRequest subscribeRequest = new ModelSubscribeRequest();// projectId来源于当前业务模块的DS_PROJECT_ID环境变量subscribeRequest.setProjectId("trantor_060");// 这里记录要订阅的模型名称以及相关事件类型,详情参考文档"发送原理"一节HashMap<String, String> subscribeModels = new HashMap<>();subscribeModels.put("base_User", "CUD");subscribeModels.put("base_Staff", "CUD");subscribeRequest.setSubscribeModels(subscribeModels);autumnClient.subscribe(subscribeRequest);// 取消订阅目标模型,以 base_User 模型为例ModelUnsubscribeRequest unSubscribeRequest = new ModelUnsubscribeRequest();// projectId来源于当前业务模块的DS_PROJECT_ID环境变量unSubscribeRequest.setProjectId("trantor_060");// 这里记录要取消订阅的模型名称HashSet<String> modelNames = new HashSet<>();modelNames.add("base_User");unSubscribeRequest.setModelNames(modelNames);autumnClient.unsubscribe(unSubscribeRequest);
1.2 如何消费被订阅的模型数据变更消息?
订阅模型变更消息简单来说是依赖DS提供的SDK,实现相关接口即可。
- 依赖DS提供的MQ消费SDK
<dependency> <groupId>io.terminus.platform</groupId> <artifactId>autumn-data-subscribe</artifactId> <version>5.2.0-SNAPSHOT</version></dependency>- 实现autumn-data-subscribe中的DataStoreSubscribeMsgConsumer接口
//监听base_User的Listener@DataStoreEventListener(listenModels={"base_User"})@Slf4jpublic class BaseUserMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}//监听base_Staff的Listener@DataStoreEventListener(listenModels={"base_Staff"})@Slf4jpublic class BaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}//或者在一个Listener中同时监听base_User和base_Staff,@DataStoreEventListener(listenModels={"base_User","base_Staff"})@Slf4jpublic class BaseUserAndBaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}@DataStoreEventListener注解配置当前Consumer订阅哪些模型的数据变更消息,默认为*,代表监听当前项目下所有被订阅的模型。