DS模型数据变更消息用户指南
1.模型订阅
想要订阅并消费模型数据变更消息,有以下几个前提:
- DS Server配置了MQ相关环境变量
- DS Server模型订阅功能开关开启
- 关注的模型被订阅
1.1 DS Server配置MQ相关环境变量(环境变量配置完成后需要重启DS服务)
模型数据变更消息功能依赖于MQ实现,目前支持RocketMQ,RocketMQ目前主流主要有两种,一种是开源版的RocketMQ,另一种是阿里云的RocketMQ, 这两个在具体使用和配置上存在一些差别,因此DS Server在配置MQ环境变量时存在一些差异。
1.1.1 使用开源版RocketMQ
SUBSCRIBE=trueSUBSCRIBE_VERSION=V2 #0.16使用V2版本的消息格式MQ_SERVER_ADDRESS=127.0.0.1:9876 #MQ的Server AddressCLIENT_TYPE=ROCKETMQ #当前MQ类型开源版的RocketMQ可以自动创建Topic,DS Server发送模型数据变更消息的Topic为固定前缀”DATASTORE_SUBSCRIBE_“加当前模型的租户id,比如当前 被订阅模型的租户id为trantor,那么Topic为DATASTORE_SUBSCRIBE_trantor
1.1.2 使用阿里云RocketMQ
使用阿里云的RocketMQ(ONS)时,需要提前在阿里云控制台上创建好DS Server发送模型数据变更消息需要的Topic, 如当前需要的Topic为DATASTORE_SUBSCRIBE_trantor ,

消息类型选择分区顺序消息
SUBSCRIBE=trueSUBSCRIBE_VERSION=V2 #0.16使用V2版本的消息格式MQ_SERVER_ADDRESS=127.0.0.1:9876 #MQ的Server AddressCLIENT_TYPE=ONS #当前MQ类型ALIYUN_ACCESSKEY= #阿里云ONS AccessKeyALIYUN_SECRETKEY= #阿里云ONS Secret1.1.3 补充
DS Server发送的模型变更消息为分区顺序消息,分区Key为当前模型名称,对同一个模型来说,模型的增删改应该是严格有序的。
1.2 调用DS接口订阅模型
DS Server提供订阅模型/取消订阅模型的接口,所以直接调用DS的接口即可完成对模型的订阅/取消订阅功能,
- 订阅 AutumnClient.subscribe(ModelSubscribeRequest),
- 取消订阅 AutumnClient.unsubscribe(ModelUnsubscribeRequest)
demo:
// 订阅目标模型,以 base_User,base_Staff 模型为例ModelSubscribeRequest subscribeRequest = new ModelSubscribeRequest();// projectId来源于当前业务模块的DS_PROJECT_ID环境变量subscribeRequest.setProjectId("trantor_060");// 这里记录要订阅的模型名称以及相关事件类型,详情参考文档"发送原理"一节HashMap<String, String> subscribeModels = new HashMap<>();subscribeModels.put("base_User", "CUD");subscribeModels.put("base_Staff", "CUD");subscribeRequest.setSubscribeModels(subscribeModels);autumnClient.subscribe(subscribeRequest);// 取消订阅目标模型,以 base_User 模型为例ModelUnsubscribeRequest unSubscribeRequest = new ModelUnsubscribeRequest();// projectId来源于当前业务模块的DS_PROJECT_ID环境变量unSubscribeRequest.setProjectId("trantor_060");// 这里记录要取消订阅的模型名称HashSet<String> modelNames = new HashSet<>();modelNames.add("base_User");unSubscribeRequest.setModelNames(modelNames);autumnClient.unsubscribe(unSubscribeRequest);
2.业务端订阅消费被订阅的模型变更消息
DS提供了消费模型数据变更消息的SDK,该SDK主要简化了相关配置,规范化相关代码,提供统一的Consumer接口
2.1 依赖DS-MQ-SDK
<dependency> <groupId>io.terminus.platform</groupId> <artifactId>autumn-data-subscribe</artifactId> <version>5.2.0-SNAPSHOT</version></dependency>2.2 配置MQ相关环境变量
2.2.1 使用开源版RocketMQ
application.yml中添加如下配置:
autumn: subscribe: enable: true #MQ开关terminus: mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876} clientType: ${CLIENT_TYPE:ROCKETMQ} maxReconsumeTimes: ${MAX_RECONSUME_TIMES:-1}环境变量中的配置:
MQ_SERVER_ADDRESS= #RocketMQ地址DS_MQ_CONSUMER_GROUP= #consumerGroup 非必填,具体介绍请看2.2.2章节DS_PROJECT_ID= #当前租户id2.2.2 使用阿里云RocketMQ
在阿里云RocketMQ控制台上创建当前模块的ConsumerGroup,比如当前ConsumerGroup为GID_TRADR_CONSUMER_GROUP

application.yml中添加如下配置:
autumn: subscribe: enable: true #MQ开关terminus: mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876} #阿里云RocketMQ地址 clientType: ${CLIENT_TYPE:ONS} maxReconsumeTimes: ${MAX_RECONSUME_TIMES:-1} aliyun: accessKey: ${ALIYUN_ACCESSKEY:} secretKey: ${ALIYUN_SECRETKEY:}环境变量中的配置:
MQ_SERVER_ADDRESS= #阿里云ONS地址ALIYUN_ACCESSKEY= #阿里云AccessKeyALIYUN_SECRETKEY= #阿里云SecretDS_MQ_CONSUMER_GROUP= #consumerGroup 必填,具体介绍请看2.2.2章节DS_PROJECT_ID= #当前租户id2.2.2 部分环境变量解释
-
MAX_RECONSUME_TIMES:MAX_RECONSUME_TIMES为消息投递失败后的最大重试次数,-1代表无线重试,比如某一模型消息消费失败,MAX_RECONSUME_TIMES当前设置的值为-1,因为消息为 分区顺序消息,分区key为当前模型名称,那么当前模型的消息会阻塞住,知道该消息消费成功或忽略掉。某一个模型消息阻塞不会影响其他模型消息的正常消费。
-
DS_MQ_CONSUMER_GROUP:DS_MQ_CONSUMER_GROUP用来自定义设置消费者组ConsumerGroup。使用开源版的RocketMQ时,如果没有主动配置该环境变量, SDK会根据当前模块全路径计算出一个ConsumerGroup,同一个模块无论部署几个节点都会生成相同的ConsumerGroup;使用阿里云RocketMQ时,由于ConsumerGroup需要 手动创建,所以需要在阿里云RocketMQ控制台提前创建好ConsumerGroup,然后配置DS_MQ_CONSUMER_GROUP环境变量。
2.3 实现autumn-data-subscribe SDK中提供的DataStoreSubscribeMsgConsumer接口
//监听base_User的Listener@DataStoreEventListener(listenModels={"base_User"})@Slf4jpublic class BaseUserMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}//监听base_Staff的Listener@DataStoreEventListener(listenModels={"base_Staff"})@Slf4jpublic class BaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}//或者在一个Listener中同时监听base_User和base_Staff,@DataStoreEventListener(listenModels={"base_User","base_Staff"})@Slf4jpublic class BaseUserAndBaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer { @Override public boolean consume(DataStoreTransactionMsg message) { log.info("成功接收到MQ消息:message:{}",message); //业务逻辑 return true; }}@DataStoreEventListener注解配置当前Consumer订阅哪些模型的数据变更消息,默认为*,代表监听当前项目下所有被订阅的模型。
2.4 消息格式(V2版本)
上面介绍了如何订阅模型,如何消费消息,以及相关的配置,本章节介绍一些模型数据变更消息的消息格式,以便拿到模型数据变更消息后能正确处理消费。 0.16版本之前为V1版本的消息格式,在0.16迭代中,V1版本的消息格式不能满足业务需求,因此添加了V2版本的消息格式,为了兼容老版本,在DS Server段采用 环境变量控制(1.1章节),以下消息格式为V2版本的消息格式。
2.4.1 单个和批量创建举例
{ "messageList": [ { "type": "Create", "modelName": "user_center_LogicalUser", "id": "601", "createMsg": { "body": { "createdAt": "2020-03-30 16:55:03", "isDeleted": 0, "phone": "18358105381", "name": "user6", "id": "601", "age": 18, "updatedAt": "2020-03-30 16:55:03" } }, "messageTime": "Mar 30, 2020 4:55:03 PM" }, { "type": "Create", "modelName": "user_center_LogicalUser", "id": "602", "createMsg": { "body": { "createdAt": "2020-03-30 16:55:03", "isDeleted": 0, "phone": "18358105381", "name": "user6", "id": "602", "age": 18, "updatedAt": "2020-03-30 16:55:03" } }, "messageTime": "Mar 30, 2020 4:55:03 PM" } ], "projectId": "ds_unit_test_project"}2.4.2 单个和批量修改举例
{ "messageList": [ { "type": "Update", "modelName": "user_center_User", "id": "1000002", "updateMsg": { "before": { "json_field": "{}", "createdAt": "Mar 30, 2020 4:55:02 PM", "datetime": "May 22, 1970 4:36:53 PM", "isDeleted": 0, "phone": "18358105381", "name": "user3", "id": "1000002", "age": 18, "updatedAt": "Mar 30, 2020 4:55:02 PM" }, "after": { "id": "1000002", "name": "user3", "phone": "18358105381", "datetime": "May 22, 1970 4:36:53 PM", "age": 18, "isDeleted": 0, "createdAt": "Mar 30, 2020 4:55:02 PM", "updatedAt": "Mar 30, 2020 4:55:03 PM" } }, "messageTime": "Mar 30, 2020 4:55:03 PM" }, { "type": "Update", "modelName": "user_center_User", "id": "1000003", "updateMsg": { "before": { "json_field": "{}", "createdAt": "Mar 30, 2020 4:55:02 PM", "datetime": "May 22, 1970 4:36:53 PM", "isDeleted": 0, "phone": "18358105381", "name": "user3", "id": "1000003", "age": 18, "updatedAt": "Mar 30, 2020 4:55:02 PM" }, "after": { "id": "1000003", "name": "user3", "phone": "18358105381", "datetime": "May 22, 1970 4:36:53 PM", "age": 18, "isDeleted": 0, "createdAt": "Mar 30, 2020 4:55:02 PM", "updatedAt": "Mar 30, 2020 4:55:03 PM" } }, "messageTime": "Mar 30, 2020 4:55:03 PM" } ], "projectId": "ds_unit_test_project",}2.4.3 单个和批量删除消息格式举例
{ "projectId": "test", "timestamp": 1586748574139, "md5OfProjectIdAndTimestamp": "0b03c9a1574d3450b2acafea9755aadd", "messageList": [ { "type": "Delete", "modelName": "People", "id": "601", "messageTime": "Apr 13, 2020 11:29:34 AM", "deleteMsg": { "body": { "createdAt": "2020-03-30 16:55:03", "isDeleted": 0, "phone": "18358105381", "name": "user6", "id": "601", "age": 18, "updatedAt": "2020-03-30 16:55:03" } } }, { "type": "Delete", "modelName": "People", "id": "602", "messageTime": "Apr 13, 2020 11:29:34 AM", "deleteMsg": { "body": { "createdAt": "2020-03-30 16:55:03", "isDeleted": 0, "phone": "18358105381", "name": "user6", "id": "602", "age": 18, "updatedAt": "2020-03-30 16:55:03" } } } ]}常见问题及排查方案
1.消费者没有收到订阅的模型的消息
以开源版RocketMQ为例
-
DSServer: 1.首先检查该模型是否被订阅(这一步有问题的概率目前比较小)。 2.目前DS发送模型变更消息已经由事务前发送消息改为事务后发送消息,并且MQ消息发送失败不影响事务的正常提交,所以可以先排查DS日志,找到相关模型是否消息发送失败,或者再次触发当前模型CUD操作,查看当前时间MQ中是否存在相应的消息。
-
MQ: 如果MQ中存在消息,说明DS-Server正常,可以查看未收到消息的业务模块的ConsumerGroup是否已经消费该消息。
-
RocketMQ控制台切换到消息页,设置开始时间和结束时间,切换到对应环境的Topic,DS的Topic规则为DATASTORE_SUBSCRIBE_租户ID

-
如果MQ中存在相应的消息,查看MESSAGE DETAIL

-
找到相对应的ConsumerGroup,业务模块对应的ConsumerGroup在业务模块刚启动的时候会打印出来,关键日志如下,consumer group :

3.如果消息已经被消费,说明业务方已经收到并且处理过该消息,请检查业务代码。如果消息未被消费,并且没有投递,说明当前ConsumerGroup不在线。
2.有时会丢失某些消息
典型真实案例:某客户使用了DS的模型数据变更消息监听功能,使用的MQ为阿里云ONS,开发环境和测试环境使用了同一个MQ实例,不同的Topic,客户那边反馈开发环境经常会丢消息,检查阿里云RocketMQ控制台发现丢掉消息确实被投递了, 但是一直消费失败,但是当前业务模块中却找不到相关日志,非常奇怪,经过最后排查发现该问题是测试环境和开发环境配置成了相同的ConsumerGroup导致的。如下图:

在同一个MQ实例中,开发环境和测试环境配置成了同一个ConsumerGroup,导致GID_dev会同时监听两个Topic,并且GID_dev存在两个消费节点,所以,dev_topic的消息有时候会投递到 测试环境节点中,导致开发环境节点消息丢失。
正确配置如下:
