调度任务
调度任务是指系统为了自动完成特定任务,在约定的特定时刻去执行任务的过程。
调度任务有两种使用形态:
- 一种是在线编辑声明式, 即声明固定触发逻辑, 以固定的规则执行。
- 另外一种是动态触发式, 即可以通过代码动态的触发一些定时任务, 比如订单超期释放库存等。动态触发又分为一次性调度任务和动态调度任务, 以满足不同的业务场景。
在线编辑声明调度任务
进入 制品中心->配置->业务域->调度任务,新增调度任务:

其中:
| 字段 | 类型 | 规则 |
|---|---|---|
| 资源原标识 | string | 必填,50字以内的大写字母、小写字母或数字,必须大写字母开头编辑时不可修改 |
| 资源名称 | string | 必填,50个字以内的无限制字符 |
| 调用逻辑函数 functionKey | 逻辑函数 | 必填,选中逻辑资源,可选范围为当前应用-版本下所有业务域下的逻辑资源 |
| 逻辑资源类型 functionType | 枚举 | 必填,逻辑资源类型,可选 LogicFlow 或者 LogicFunction |
| 调用逻辑流 logicFlowKey | 逻辑流,废弃,尽量使用逻辑函数 | 必填,选择逻辑流,可选范围为当前应用-版本下所有业务域下的逻辑流 |
| 分片数量 | int | 必填,大于0,整数精度是1 |
| 触发类型 | dictionary枚举为表达式(Cron)固定间隔(FixedRate)固定延迟(FixedDelay) | 与具体配置联动,默认为Cron |
| 表达式(Cron) | string | 联动选择类型为Cron出现,必填,暂不做校验 |
| 固定间隔(s) | int | 联动选择类型为FixedRate时出现,必填,大于0,整数精度是1 |
| 固定延迟(s) | int | 联动选择类型为FixedDelay时出现,必填,大于0,整数精度是1 |
| 重试间隔(s) | int | 大于0,整数精度是1 |
| 超时时间(s) | int | 大于0,整数精度是1 |
| 最大重试次数 | int | 大于0,整数精度是1 |
| 描述 | string | 非必填,200个字以内的无限制格式字符 |
WARNING: 注意 logicFlowKey 已废弃,建议使用 functionType 和 functionKey 做对应的配置。 对应配套的ms镜像是2022年3月1日后的版本便可,否则元数据的上报和发布不会更新过来。 最新版本请点击Trantor Release Components 1.0
对应之前本地配置文件如下,具体参考 io.terminus.trantor.metaStore.meta.job.ScheduleJob.
{ "schemaVersion": 1, "name": "定时打日志任务", "functionType": "LogicFunction", "functionKey": "item_AutoEchoFunc", "config": { "triggerType": "Cron", "expression": "0/5 * * * * ?", "shardingCount" : 1 }}切换到elasticjob调度(since 0.17.37.RELEASE)
目前job的调度模式默认为Redisson,在开发测试环境使用资源会占用少些,不过Redisson实现的调度较为不稳定,另外出问题的时候排查也较为麻烦, 0.17.37.RELEASE 之后支持使用 elasticjob 进行调度。
对应配置调整主要以下两点, 注意 业务模块和运行态ms这两个runtime都要做以下两点处理:
- 启用 elasticjob 调度, 配置环境变量
TRANTOR_ELASTICJOB_ENABLED=true - 增加 zk 依赖,业务模块主要是调度job使用,运行态ms主要是管理job使用,如启停用
备注:注意运行态ms指统一工作台使用的 meta-store, 一般在dice上指 trantor-gaia-workspace 这个应用。
没开启会保留之前的redisson调度,开启后会在启动 elasticjob 调度的时候 cancel掉旧的redisson调度。
0.17.72.RELEASE 之后支持调整 trantor 任务在 elasticjob 里的 namespace, 可以实现开发测试环境共用一个 zk,需要的话可配置下。
变量为 TRANTOR_ELASTICJOB_NAMESPACE, 默认值为 trantor_job。
如果需要本地启动, 直接 brew install zookeeper 便可, 其中 zk 地址的默认值为 ${ZOOKEEPER_HOSTS:${ZOOKEEPER_HOST:127.0.0.1}:${ZOOKEEPER_PORT:2181}}
如果需要线上启用,则需erda添加zk依赖,需要注意的是 zk 版本必须为 3.6.x。
下边是erda添加zk的例子,进入dice.yml页面,点击右上角编辑按钮。

图形化点击添加,搜索对应的 zookeeper addon ,以下出现两个结果,注意只要添加 3.6.x 一个便可。

添加成功点保存便可, 最后走流水线发布。
新增cronjob分片参数(since 0.17.37.RELEASE)
固定job和动态job都添加了对应的分片参数, 详见 io.terminus.trantorframework.sdk.job.ScheduleJobConfig#shardingCount。
以下是固定job使用分片参数的例子, 主要关注shardingCount参数。
{ "schemaVersion": 1, "name": "定时打日志任务", "functionType": "LogicFunction", "functionKey": "item_AutoEchoFunc", "config": { "triggerType": "Cron", "expression": "0/5 * * * * ?", // 该参数是新增的分片参数, 没有默认为1 "shardingCount" : 2 }}执行的时候获取分片参数,执行分片逻辑,比如10w的用户,分开两个实例每个实例跑5w这样子。
@FunctionImpl@Slf4jpublic class AutoEchoFuncImpl implements AutoEchoFunc { @Override public void execute() {
int shardingItem = ShardingJobContext.getShardingItem(); int shardingCount = ShardingJobContext.getShardingCount(); log.info("{}/{} auto echo hello world!!", shardingItem , shardingCount-1);
// 分布式任务拆分逻辑,每个实例处理自己的分片数据 // 比如这里 item=0 的, 处理用户id为 2,4,6,8.. 的数据 // select * from user where id % shardingCount = shardingItem; }}一次性 Job
一次性job触发
示例如下:
本例是最简单的单次调度示例, 如果有复杂场景, 比如设置超时等, 可以Job 的其他 API
// 任务需要触发的 Func 声明如下@Functionpublic interface RefreshItemFunc {
// 其中会有一个入参是 ItemBO, 参数名是 item void execute(ItemBO item);}
@FunctionImpl(name = "XXX默认实现")public class TriggerJobFuncImpl implements TriggerJobFunc {
@Override public void execute(ItemBO item) { IntResult id = DS.create(item); // do something... business code...
// jobKey, 需要保证唯一, 如果不唯一会覆盖同 Key 任务 String jobKey = "XXXJob" + id.getValue(); // 创建任务后延迟多久触发, 单位是秒 // 本例是延迟一天, 则计算出一天所需要的秒数 long delay = 60 * 60 * 24; // 调用 flow/func 时传入数据, 类型是一个 Map<String, Object> // Key 是 flow/func 参数名, value 是传入值 Map<String, Object> data = new HashMap<>(); // 如例中参数名是 item, 则在 Map 中 put 一个 key 为 item 的 ItemBO 对象, 来传递相关参数 // 传入的值必须是可被序列化的, 否则会有问题 data.put("item", item);
// 调用 Job.createDisposableJob 方法创建一次性任务 Job.createDisposableJob(jobKey, FunctionType.LogicFunction, "item_RefreshItemFunc", delay, data); }}一次性job调整(since 0.17.32)
一次性job从redisson调度换成了mq调度,所以要跑一次性job的话,需要额外配置开关和MQ参数,其中 MQ_SERVER_ADDRESS 是必填的,具体如下:
| 变量名称 | 配置方 | 含义 | 备注 |
|---|---|---|---|
| TRANTOR_DISPOSABLE_JOB_ENABLED | 业务服务 | 是否启用一次性job | 非必填,默认true |
| MQ_CLIENT_TYPE | 业务服务 | mq类型 | 非必填,ROCKETMQ | ONS 默认ROCKETMQ |
| MQ_SERVER_ADDRESS | 业务服务 | mq地址 | 必填 |
| MQ_PRODUCER_GROUP | 业务服务 | 生产者id | 非必填 |
| DISPOSABLE_JOB_CONSUMER_GROUP | 业务服务 | 一次性job消费者id | 非必填,默认 GID_AsyncJob_${trantor.mainModule} |
| DISPOSABLE_JOB_CONSUMER_TAG | 业务服务 | 一次性job消费者tag | 非必填,默认为 mainModule || embedModule 格式 |
| ALIYUN_ACCESSKEY | 业务服务 | 阿里云accessKey | mq为ons 必填 |
| ALIYUN_SECRETKEY | 业务服务 | 阿里云secretKey | mq为ons 必填 |
注意WANRING:
使用rocketMq的话,topic和consumerGroup会默认自己创建,所以不用太关心。
使用阿里云的ons的话,topic和consumerGroup 需要额外在ons控制台创建。
- 其中topic为固定值:
trantor_disposable_job - consumerGroup默认值:
GID_AsyncJob_${trantor.mainModule}, 如果自定义可以通过DISPOSABLE_JOB_CONSUMER_GROUP变量去配置
另外 0.17.72.RELEASE 之后consumerGroup 加上了自动创建逻辑,需要自动配置的话可以加这些参数::
| 变量名 | 含义 |
|---|---|
| ALIYUN_ACCESSKEY | accessKey |
| ALIYUN_SECRETKEY | secretKey |
| ALIYUN_INSTANCEID | MQ实例ID |
| ALIYUN_REGIONID | MQ接入点, 如 mq-internet-access, 具体参考 |
最后切换到ons后,业务范围的trigger消息消费也会受到影响的,所以data-store也要一起切换过来。
延迟的说明
根据mq产品选择的不同,一次性job的 delay 执行也会受影响, 超过该延迟时间时间的任务 trantor 会做持久化后再重新投递,具体参考长延迟的支持。
-
阿里云 ons 是可以支持40天内的 delay , 不过收费。
-
开源的 RocketMQ 只能有以下时间间隔的 delay,最大2个钟。另外注意如果处于中间值这种,我们实现上会取偏后的,如15s会延后到30s跑。
具体参考 io.terminus.common.rocketmq.common.DelayTimeLevelEnum
public enum DelayTimeLevelEnum { LEVEL_ZERO(0, 0L, "0s"), LEVEL_ONE(1, 1000L, "1s"), LEVEL_TWO(2, 5000L, "5s"), LEVEL_THREE(3, 10000L, "10s"), LEVEL_FOUR(4, 30000L, "30s"), LEVEL_FIVE(5, 60000L, "1m"), LEVEL_SIX(6, 120000L, "2m"), LEVEL_SEVEN(7, 180000L, "3m"), LEVEL_EIGHT(8, 240000L, "4m"), LEVEL_NINE(9, 300000L, "5m"), LEVEL_TEN(10, 360000L, "6m"), LEVEL_ELEVEN(11, 420000L, "7m"), LEVEL_TWELVE(12, 480000L, "8m"), LEVEL_THIRTEEN(13, 540000L, "9m"), LEVEL_FOURTEEN(14, 600000L, "10m"), LEVEL_FIFTEEN(15, 1200000L, "20m"), LEVEL_SIXTEEN(16, 1800000L, "30m"), LEVEL_SEVENTEEN(17, 3600000L, "1h"), LEVEL_EIGHTEEN(18, 7200000L, "2h");}长延迟的支持(0.17/0.18/1.0最新版本)
mq不支持的长延迟任务,会默认持久化到 base__async_job 中,后续通过 elasticjob 将任务重新扫描扫描出来进行投递,
所以要支持长延迟任务得开启 elasticjob, 具体参考 [定时任务使用elasticjob](#切换到elasticjob调度(since 0.17.37.RELEASE))
长延迟任务的数据参考:
select * from base__async_job;以下对相关字段做具体介绍:
public class AsyncJob extends BaseModel<Long> {
@Field(name = "唯一Key") private String key;
@Field(name = "任务所属业务域") private String moduleKey;
@Field(name = "异步任务名称") private String name;
/** * 绑定的逻辑流资源 LogicFlow Key * @deprecated using {@link #functionKey}, 因 logicFlow 在 1.0 中定义更偏在线编排 */ @Deprecated private String logicFlowKey;
/** * 逻辑流或逻辑函数的接口key, 必填 */ private String functionKey;
/** * 逻辑资源类型, 只能使用flow或者func, 必填, 默认为逻辑流(兼容旧的情况) */ private FunctionType functionType = FunctionType.LogicFlow;
/** * 触发延迟, 单位是 秒 */ @Field(name = "延迟时间(秒)") private Long delay;
/** * 期望投递时间 */ @Field(name = "期望时间") private Date pushAt;
/** * 实际投递实际那 */ @Field(name = "投递时间") private Date pushedAt;
/** * 异步任务的参数, 会被传入到 LogicFlow 里, 作为入参 * 其中 Map 的 Key 是 Flow 的参数名, value 是对应的参数 */ @Field(name = "任务参数", type = FieldType.Json) private Map<String, Object> data;
/** * 投递成功之后记录的消息id, 根据msgId可以去消息的控制台来排查消息没有消费的问题 */ @Field(name = "消息id") private String msgId;
@Field(name = "是否已投递", defaultValue = "false", nullable = false) private Boolean pushed = false;
/** * 记录投递失败的重试次数, 避免失败过多一直死循环,目前尝试重新投递的次数为7 */ @Field(name = "重试次数", defaultValue = "0", nullable = false) private Integer retryTime = 0;}一次性job取消(0.17.32已废弃)
注意: 一次性job 从 redisson 调度换成了 mq 调度, 取消的意义是不大的,所以这个接口在后续版本中会废弃。
// 取消一次性 JobJob.cancelDisposableJob(disposableJobKey);手动运维接口(0.17.32已废弃)
用于在一次性job执行失败的时候,通过运维接口手动重新触发job执行。
Post 请求 ${meta-store-slave}/api/internal/job/devops/disposable/retry
body 为 json 的jobKey, 如 ["testJobKey"]
动态job
动态job触发
动态 Job 和一次性 Job 调用方式类似, 只是动态 Job 有三个子类型, 分别是 Cron, FixedDelay 和 FixedRate, 和固定声明式的类型一致.
具体调用方式就不在复述, 可以通过 Job 类的静态方法直接触发:
另外注意给 flow/func 传参的话, map 中的 key 是 flow/func 的参数名, value 是传入值
// 动态创建 Cron 表达式类型的动态 JobJob.createCronDynamicJob(..);
// 动态创建固定延迟类型的动态 JobJob.createFixedDelayDynamicJob(..);
// 动态创建固定间隔类型的动态 JobJob.createFixedRateDynamicJob(..);动态job取消
除了动态创建 Job 之外, Trantor 还提供了动态取消调度的 API.
// 取消动态 JobJob.cancelDynamicJob(dynamicJobKey);