跳转到内容

DS模型数据变更消息用户指南

1.模型订阅

想要订阅并消费模型数据变更消息,有以下几个前提:

  • DS Server配置了MQ相关环境变量
  • DS Server模型订阅功能开关开启
  • 关注的模型被订阅

1.1 DS Server配置MQ相关环境变量(环境变量配置完成后需要重启DS服务)

模型数据变更消息功能依赖于MQ实现,目前支持RocketMQ,RocketMQ目前主流主要有两种,一种是开源版的RocketMQ,另一种是阿里云的RocketMQ, 这两个在具体使用和配置上存在一些差别,因此DS Server在配置MQ环境变量时存在一些差异。

1.1.1 使用开源版RocketMQ

SUBSCRIBE=true
SUBSCRIBE_VERSION=V2 #0.16使用V2版本的消息格式
MQ_SERVER_ADDRESS=127.0.0.1:9876 #MQ的Server Address
CLIENT_TYPE=ROCKETMQ #当前MQ类型

开源版的RocketMQ可以自动创建Topic,DS Server发送模型数据变更消息的Topic为固定前缀”DATASTORE_SUBSCRIBE_“加当前模型的租户id,比如当前 被订阅模型的租户id为trantor,那么Topic为DATASTORE_SUBSCRIBE_trantor

1.1.2 使用阿里云RocketMQ

使用阿里云的RocketMQ(ONS)时,需要提前在阿里云控制台上创建好DS Server发送模型数据变更消息需要的Topic, 如当前需要的Topic为DATASTORE_SUBSCRIBE_trantor ,

阿里云创建Topic

消息类型选择分区顺序消息

SUBSCRIBE=true
SUBSCRIBE_VERSION=V2 #0.16使用V2版本的消息格式
MQ_SERVER_ADDRESS=127.0.0.1:9876 #MQ的Server Address
CLIENT_TYPE=ONS #当前MQ类型
ALIYUN_ACCESSKEY= #阿里云ONS AccessKey
ALIYUN_SECRETKEY= #阿里云ONS Secret

1.1.3 补充

DS Server发送的模型变更消息为分区顺序消息,分区Key为当前模型名称,对同一个模型来说,模型的增删改应该是严格有序的。

1.2 调用DS接口订阅模型

DS Server提供订阅模型/取消订阅模型的接口,所以直接调用DS的接口即可完成对模型的订阅/取消订阅功能,

  • 订阅 AutumnClient.subscribe(ModelSubscribeRequest),
  • 取消订阅 AutumnClient.unsubscribe(ModelUnsubscribeRequest) demo:
    // 订阅目标模型,以 base_User,base_Staff 模型为例
    ModelSubscribeRequest subscribeRequest = new ModelSubscribeRequest();
    // projectId来源于当前业务模块的DS_PROJECT_ID环境变量
    subscribeRequest.setProjectId("trantor_060");
    // 这里记录要订阅的模型名称以及相关事件类型,详情参考文档"发送原理"一节
    HashMap<String, String> subscribeModels = new HashMap<>();
    subscribeModels.put("base_User", "CUD");
    subscribeModels.put("base_Staff", "CUD");
    subscribeRequest.setSubscribeModels(subscribeModels);
    autumnClient.subscribe(subscribeRequest);
    // 取消订阅目标模型,以 base_User 模型为例
    ModelUnsubscribeRequest unSubscribeRequest = new ModelUnsubscribeRequest();
    // projectId来源于当前业务模块的DS_PROJECT_ID环境变量
    unSubscribeRequest.setProjectId("trantor_060");
    // 这里记录要取消订阅的模型名称
    HashSet<String> modelNames = new HashSet<>();
    modelNames.add("base_User");
    unSubscribeRequest.setModelNames(modelNames);
    autumnClient.unsubscribe(unSubscribeRequest);

2.业务端订阅消费被订阅的模型变更消息

DS提供了消费模型数据变更消息的SDK,该SDK主要简化了相关配置,规范化相关代码,提供统一的Consumer接口

2.1 依赖DS-MQ-SDK

<dependency>
<groupId>io.terminus.platform</groupId>
<artifactId>autumn-data-subscribe</artifactId>
<version>5.2.0-SNAPSHOT</version>
</dependency>

2.2 配置MQ相关环境变量

2.2.1 使用开源版RocketMQ

application.yml中添加如下配置:

autumn:
subscribe:
enable: true #MQ开关
terminus:
mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876}
clientType: ${CLIENT_TYPE:ROCKETMQ}
maxReconsumeTimes: ${MAX_RECONSUME_TIMES:-1}

环境变量中的配置:

MQ_SERVER_ADDRESS= #RocketMQ地址
DS_MQ_CONSUMER_GROUP= #consumerGroup 非必填,具体介绍请看2.2.2章节
DS_PROJECT_ID= #当前租户id

2.2.2 使用阿里云RocketMQ

在阿里云RocketMQ控制台上创建当前模块的ConsumerGroup,比如当前ConsumerGroup为GID_TRADR_CONSUMER_GROUP

创建消费者组

application.yml中添加如下配置:

autumn:
subscribe:
enable: true #MQ开关
terminus:
mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876} #阿里云RocketMQ地址
clientType: ${CLIENT_TYPE:ONS}
maxReconsumeTimes: ${MAX_RECONSUME_TIMES:-1}
aliyun:
accessKey: ${ALIYUN_ACCESSKEY:}
secretKey: ${ALIYUN_SECRETKEY:}

环境变量中的配置:

MQ_SERVER_ADDRESS= #阿里云ONS地址
ALIYUN_ACCESSKEY= #阿里云AccessKey
ALIYUN_SECRETKEY= #阿里云Secret
DS_MQ_CONSUMER_GROUP= #consumerGroup 必填,具体介绍请看2.2.2章节
DS_PROJECT_ID= #当前租户id

2.2.2 部分环境变量解释

  • MAX_RECONSUME_TIMES:MAX_RECONSUME_TIMES为消息投递失败后的最大重试次数,-1代表无线重试,比如某一模型消息消费失败,MAX_RECONSUME_TIMES当前设置的值为-1,因为消息为 分区顺序消息,分区key为当前模型名称,那么当前模型的消息会阻塞住,知道该消息消费成功或忽略掉。某一个模型消息阻塞不会影响其他模型消息的正常消费。

  • DS_MQ_CONSUMER_GROUP:DS_MQ_CONSUMER_GROUP用来自定义设置消费者组ConsumerGroup。使用开源版的RocketMQ时,如果没有主动配置该环境变量, SDK会根据当前模块全路径计算出一个ConsumerGroup,同一个模块无论部署几个节点都会生成相同的ConsumerGroup;使用阿里云RocketMQ时,由于ConsumerGroup需要 手动创建,所以需要在阿里云RocketMQ控制台提前创建好ConsumerGroup,然后配置DS_MQ_CONSUMER_GROUP环境变量。

2.3 实现autumn-data-subscribe SDK中提供的DataStoreSubscribeMsgConsumer接口

//监听base_User的Listener
@DataStoreEventListener(listenModels={"base_User"})
@Slf4j
public class BaseUserMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}
//监听base_Staff的Listener
@DataStoreEventListener(listenModels={"base_Staff"})
@Slf4j
public class BaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}
//或者在一个Listener中同时监听base_User和base_Staff,
@DataStoreEventListener(listenModels={"base_User","base_Staff"})
@Slf4j
public class BaseUserAndBaseStaffMsgConsumerDemo implements DataStoreSubscribeMsgConsumer {
@Override
public boolean consume(DataStoreTransactionMsg message) {
log.info("成功接收到MQ消息:message:{}",message);
//业务逻辑
return true;
}
}

@DataStoreEventListener注解配置当前Consumer订阅哪些模型的数据变更消息,默认为*,代表监听当前项目下所有被订阅的模型。

2.4 消息格式(V2版本)

上面介绍了如何订阅模型,如何消费消息,以及相关的配置,本章节介绍一些模型数据变更消息的消息格式,以便拿到模型数据变更消息后能正确处理消费。 0.16版本之前为V1版本的消息格式,在0.16迭代中,V1版本的消息格式不能满足业务需求,因此添加了V2版本的消息格式,为了兼容老版本,在DS Server段采用 环境变量控制(1.1章节),以下消息格式为V2版本的消息格式。

2.4.1 单个和批量创建举例

{
"messageList": [
{
"type": "Create",
"modelName": "user_center_LogicalUser",
"id": "601",
"createMsg": {
"body": {
"createdAt": "2020-03-30 16:55:03",
"isDeleted": 0,
"phone": "18358105381",
"name": "user6",
"id": "601",
"age": 18,
"updatedAt": "2020-03-30 16:55:03"
}
},
"messageTime": "Mar 30, 2020 4:55:03 PM"
},
{
"type": "Create",
"modelName": "user_center_LogicalUser",
"id": "602",
"createMsg": {
"body": {
"createdAt": "2020-03-30 16:55:03",
"isDeleted": 0,
"phone": "18358105381",
"name": "user6",
"id": "602",
"age": 18,
"updatedAt": "2020-03-30 16:55:03"
}
},
"messageTime": "Mar 30, 2020 4:55:03 PM"
}
],
"projectId": "ds_unit_test_project"
}

2.4.2 单个和批量修改举例

{
"messageList": [
{
"type": "Update",
"modelName": "user_center_User",
"id": "1000002",
"updateMsg": {
"before": {
"json_field": "{}",
"createdAt": "Mar 30, 2020 4:55:02 PM",
"datetime": "May 22, 1970 4:36:53 PM",
"isDeleted": 0,
"phone": "18358105381",
"name": "user3",
"id": "1000002",
"age": 18,
"updatedAt": "Mar 30, 2020 4:55:02 PM"
},
"after": {
"id": "1000002",
"name": "user3",
"phone": "18358105381",
"datetime": "May 22, 1970 4:36:53 PM",
"age": 18,
"isDeleted": 0,
"createdAt": "Mar 30, 2020 4:55:02 PM",
"updatedAt": "Mar 30, 2020 4:55:03 PM"
}
},
"messageTime": "Mar 30, 2020 4:55:03 PM"
},
{
"type": "Update",
"modelName": "user_center_User",
"id": "1000003",
"updateMsg": {
"before": {
"json_field": "{}",
"createdAt": "Mar 30, 2020 4:55:02 PM",
"datetime": "May 22, 1970 4:36:53 PM",
"isDeleted": 0,
"phone": "18358105381",
"name": "user3",
"id": "1000003",
"age": 18,
"updatedAt": "Mar 30, 2020 4:55:02 PM"
},
"after": {
"id": "1000003",
"name": "user3",
"phone": "18358105381",
"datetime": "May 22, 1970 4:36:53 PM",
"age": 18,
"isDeleted": 0,
"createdAt": "Mar 30, 2020 4:55:02 PM",
"updatedAt": "Mar 30, 2020 4:55:03 PM"
}
},
"messageTime": "Mar 30, 2020 4:55:03 PM"
}
],
"projectId": "ds_unit_test_project",
}

2.4.3 单个和批量删除消息格式举例

{
"projectId": "test",
"timestamp": 1586748574139,
"md5OfProjectIdAndTimestamp": "0b03c9a1574d3450b2acafea9755aadd",
"messageList": [
{
"type": "Delete",
"modelName": "People",
"id": "601",
"messageTime": "Apr 13, 2020 11:29:34 AM",
"deleteMsg": {
"body": {
"createdAt": "2020-03-30 16:55:03",
"isDeleted": 0,
"phone": "18358105381",
"name": "user6",
"id": "601",
"age": 18,
"updatedAt": "2020-03-30 16:55:03"
}
}
},
{
"type": "Delete",
"modelName": "People",
"id": "602",
"messageTime": "Apr 13, 2020 11:29:34 AM",
"deleteMsg": {
"body": {
"createdAt": "2020-03-30 16:55:03",
"isDeleted": 0,
"phone": "18358105381",
"name": "user6",
"id": "602",
"age": 18,
"updatedAt": "2020-03-30 16:55:03"
}
}
}
]
}

常见问题及排查方案

1.消费者没有收到订阅的模型的消息

以开源版RocketMQ为例

  1. DSServer: 1.首先检查该模型是否被订阅(这一步有问题的概率目前比较小)。 2.目前DS发送模型变更消息已经由事务前发送消息改为事务后发送消息,并且MQ消息发送失败不影响事务的正常提交,所以可以先排查DS日志,找到相关模型是否消息发送失败,或者再次触发当前模型CUD操作,查看当前时间MQ中是否存在相应的消息。

  2. MQ: 如果MQ中存在消息,说明DS-Server正常,可以查看未收到消息的业务模块的ConsumerGroup是否已经消费该消息。

  • RocketMQ控制台切换到消息页,设置开始时间和结束时间,切换到对应环境的Topic,DS的Topic规则为DATASTORE_SUBSCRIBE_租户ID Topic

  • 如果MQ中存在相应的消息,查看MESSAGE DETAIL Message

  • 找到相对应的ConsumerGroup,业务模块对应的ConsumerGroup在业务模块刚启动的时候会打印出来,关键日志如下,consumer group : comsumer

3.如果消息已经被消费,说明业务方已经收到并且处理过该消息,请检查业务代码。如果消息未被消费,并且没有投递,说明当前ConsumerGroup不在线。

2.有时会丢失某些消息

典型真实案例:某客户使用了DS的模型数据变更消息监听功能,使用的MQ为阿里云ONS,开发环境和测试环境使用了同一个MQ实例,不同的Topic,客户那边反馈开发环境经常会丢消息,检查阿里云RocketMQ控制台发现丢掉消息确实被投递了, 但是一直消费失败,但是当前业务模块中却找不到相关日志,非常奇怪,经过最后排查发现该问题是测试环境和开发环境配置成了相同的ConsumerGroup导致的。如下图:

错误实例

在同一个MQ实例中,开发环境和测试环境配置成了同一个ConsumerGroup,导致GID_dev会同时监听两个Topic,并且GID_dev存在两个消费节点,所以,dev_topic的消息有时候会投递到 测试环境节点中,导致开发环境节点消息丢失。

正确配置如下:

正确实例