跳转到内容

调度任务

调度任务是指系统为了自动完成特定任务,在约定的特定时刻去执行任务的过程。

调度任务有两种使用形态:

  1. 一种是在线编辑声明式, 即声明固定触发逻辑, 以固定的规则执行。
  2. 另外一种是动态触发式, 即可以通过代码动态的触发一些定时任务, 比如订单超期释放库存等。动态触发又分为一次性调度任务和动态调度任务, 以满足不同的业务场景。

在线编辑声明调度任务

进入 制品中心->配置->业务域->调度任务,新增调度任务:

image-20211125144912829

其中:

字段类型规则
资源原标识string必填,50字以内的大写字母、小写字母或数字,必须大写字母开头编辑时不可修改
资源名称string必填,50个字以内的无限制字符
调用逻辑函数 functionKey逻辑函数必填,选中逻辑资源,可选范围为当前应用-版本下所有业务域下的逻辑资源
逻辑资源类型 functionType枚举必填,逻辑资源类型,可选 LogicFlow 或者 LogicFunction
调用逻辑流 logicFlowKey逻辑流,废弃,尽量使用逻辑函数必填,选择逻辑流,可选范围为当前应用-版本下所有业务域下的逻辑流
分片数量int必填,大于0,整数精度是1
触发类型dictionary枚举为表达式(Cron)固定间隔(FixedRate)固定延迟(FixedDelay)与具体配置联动,默认为Cron
表达式(Cron)string联动选择类型为Cron出现,必填,暂不做校验
固定间隔(s)int联动选择类型为FixedRate时出现,必填,大于0,整数精度是1
固定延迟(s)int联动选择类型为FixedDelay时出现,必填,大于0,整数精度是1
重试间隔(s)int大于0,整数精度是1
超时时间(s)int大于0,整数精度是1
最大重试次数int大于0,整数精度是1
描述string非必填,200个字以内的无限制格式字符

WARNING: 注意 logicFlowKey 已废弃,建议使用 functionType 和 functionKey 做对应的配置。 对应配套的ms镜像是2022年3月1日后的版本便可,否则元数据的上报和发布不会更新过来。 最新版本请点击Trantor Release Components 1.0

对应之前本地配置文件如下,具体参考 io.terminus.trantor.metaStore.meta.job.ScheduleJob.

{
"schemaVersion": 1,
"name": "定时打日志任务",
"functionType": "LogicFunction",
"functionKey": "item_AutoEchoFunc",
"config": {
"triggerType": "Cron",
"expression": "0/5 * * * * ?",
"shardingCount" : 1
}
}

切换到elasticjob调度(since 0.17.37.RELEASE)

目前job的调度模式默认为Redisson,在开发测试环境使用资源会占用少些,不过Redisson实现的调度较为不稳定,另外出问题的时候排查也较为麻烦, 0.17.37.RELEASE 之后支持使用 elasticjob 进行调度。

对应配置调整主要以下两点, 注意 业务模块运行态ms这两个runtime都要做以下两点处理

  1. 启用 elasticjob 调度, 配置环境变量 TRANTOR_ELASTICJOB_ENABLED=true
  2. 增加 zk 依赖,业务模块主要是调度job使用,运行态ms主要是管理job使用,如启停用

备注:注意运行态ms指统一工作台使用的 meta-store, 一般在dice上指 trantor-gaia-workspace 这个应用。

没开启会保留之前的redisson调度,开启后会在启动 elasticjob 调度的时候 cancel掉旧的redisson调度。

0.17.72.RELEASE 之后支持调整 trantor 任务在 elasticjob 里的 namespace, 可以实现开发测试环境共用一个 zk,需要的话可配置下。 变量为 TRANTOR_ELASTICJOB_NAMESPACE, 默认值为 trantor_job

如果需要本地启动, 直接 brew install zookeeper 便可, 其中 zk 地址的默认值为 ${ZOOKEEPER_HOSTS:${ZOOKEEPER_HOST:127.0.0.1}:${ZOOKEEPER_PORT:2181}}

如果需要线上启用,则需erda添加zk依赖,需要注意的是 zk 版本必须为 3.6.x。

下边是erda添加zk的例子,进入dice.yml页面,点击右上角编辑按钮。

图形化点击添加,搜索对应的 zookeeper addon ,以下出现两个结果,注意只要添加 3.6.x 一个便可。

添加成功点保存便可, 最后走流水线发布。

新增cronjob分片参数(since 0.17.37.RELEASE)

固定job和动态job都添加了对应的分片参数, 详见 io.terminus.trantorframework.sdk.job.ScheduleJobConfig#shardingCount

以下是固定job使用分片参数的例子, 主要关注shardingCount参数。

{
"schemaVersion": 1,
"name": "定时打日志任务",
"functionType": "LogicFunction",
"functionKey": "item_AutoEchoFunc",
"config": {
"triggerType": "Cron",
"expression": "0/5 * * * * ?",
// 该参数是新增的分片参数, 没有默认为1
"shardingCount" : 2
}
}

执行的时候获取分片参数,执行分片逻辑,比如10w的用户,分开两个实例每个实例跑5w这样子。

@FunctionImpl
@Slf4j
public class AutoEchoFuncImpl implements AutoEchoFunc {
@Override
public void execute() {
int shardingItem = ShardingJobContext.getShardingItem();
int shardingCount = ShardingJobContext.getShardingCount();
log.info("{}/{} auto echo hello world!!",
shardingItem , shardingCount-1);
// 分布式任务拆分逻辑,每个实例处理自己的分片数据
// 比如这里 item=0 的, 处理用户id为 2,4,6,8.. 的数据
// select * from user where id % shardingCount = shardingItem;
}
}

一次性 Job

一次性job触发

示例如下: 本例是最简单的单次调度示例, 如果有复杂场景, 比如设置超时等, 可以Job 的其他 API

// 任务需要触发的 Func 声明如下
@Function
public interface RefreshItemFunc {
// 其中会有一个入参是 ItemBO, 参数名是 item
void execute(ItemBO item);
}
@FunctionImpl(name = "XXX默认实现")
public class TriggerJobFuncImpl implements TriggerJobFunc {
@Override
public void execute(ItemBO item) {
IntResult id = DS.create(item);
// do something... business code...
// jobKey, 需要保证唯一, 如果不唯一会覆盖同 Key 任务
String jobKey = "XXXJob" + id.getValue();
// 创建任务后延迟多久触发, 单位是秒
// 本例是延迟一天, 则计算出一天所需要的秒数
long delay = 60 * 60 * 24;
// 调用 flow/func 时传入数据, 类型是一个 Map<String, Object>
// Key 是 flow/func 参数名, value 是传入值
Map<String, Object> data = new HashMap<>();
// 如例中参数名是 item, 则在 Map 中 put 一个 key 为 item 的 ItemBO 对象, 来传递相关参数
// 传入的值必须是可被序列化的, 否则会有问题
data.put("item", item);
// 调用 Job.createDisposableJob 方法创建一次性任务
Job.createDisposableJob(jobKey, FunctionType.LogicFunction, "item_RefreshItemFunc", delay, data);
}
}

一次性job调整(since 0.17.32)

一次性job从redisson调度换成了mq调度,所以要跑一次性job的话,需要额外配置开关和MQ参数,其中 MQ_SERVER_ADDRESS 是必填的,具体如下:

变量名称配置方含义备注
TRANTOR_DISPOSABLE_JOB_ENABLED业务服务是否启用一次性job非必填,默认true
MQ_CLIENT_TYPE业务服务mq类型非必填,ROCKETMQ | ONS
默认ROCKETMQ
MQ_SERVER_ADDRESS业务服务mq地址必填
MQ_PRODUCER_GROUP业务服务生产者id非必填
DISPOSABLE_JOB_CONSUMER_GROUP业务服务一次性job消费者id非必填,默认 GID_AsyncJob_${trantor.mainModule}
DISPOSABLE_JOB_CONSUMER_TAG业务服务一次性job消费者tag非必填,默认为 mainModule || embedModule 格式
ALIYUN_ACCESSKEY业务服务阿里云accessKeymq为ons 必填
ALIYUN_SECRETKEY业务服务阿里云secretKeymq为ons 必填

注意WANRING:

使用rocketMq的话,topic和consumerGroup会默认自己创建,所以不用太关心。

使用阿里云的ons的话,topic和consumerGroup 需要额外在ons控制台创建

  • 其中topic为固定值: trantor_disposable_job
  • consumerGroup默认值: GID_AsyncJob_${trantor.mainModule}, 如果自定义可以通过 DISPOSABLE_JOB_CONSUMER_GROUP 变量去配置

另外 0.17.72.RELEASE 之后consumerGroup 加上了自动创建逻辑,需要自动配置的话可以加这些参数::

变量名含义
ALIYUN_ACCESSKEYaccessKey
ALIYUN_SECRETKEYsecretKey
ALIYUN_INSTANCEIDMQ实例ID
ALIYUN_REGIONIDMQ接入点, 如 mq-internet-access, 具体参考

最后切换到ons后,业务范围的trigger消息消费也会受到影响的,所以data-store也要一起切换过来。

延迟的说明

根据mq产品选择的不同,一次性job的 delay 执行也会受影响, 超过该延迟时间时间的任务 trantor 会做持久化后再重新投递,具体参考长延迟的支持

  • 阿里云 ons 是可以支持40天内的 delay , 不过收费。

  • 开源的 RocketMQ 只能有以下时间间隔的 delay,最大2个钟。另外注意如果处于中间值这种,我们实现上会取偏后的,如15s会延后到30s跑。

具体参考 io.terminus.common.rocketmq.common.DelayTimeLevelEnum

public enum DelayTimeLevelEnum {
LEVEL_ZERO(0, 0L, "0s"),
LEVEL_ONE(1, 1000L, "1s"),
LEVEL_TWO(2, 5000L, "5s"),
LEVEL_THREE(3, 10000L, "10s"),
LEVEL_FOUR(4, 30000L, "30s"),
LEVEL_FIVE(5, 60000L, "1m"),
LEVEL_SIX(6, 120000L, "2m"),
LEVEL_SEVEN(7, 180000L, "3m"),
LEVEL_EIGHT(8, 240000L, "4m"),
LEVEL_NINE(9, 300000L, "5m"),
LEVEL_TEN(10, 360000L, "6m"),
LEVEL_ELEVEN(11, 420000L, "7m"),
LEVEL_TWELVE(12, 480000L, "8m"),
LEVEL_THIRTEEN(13, 540000L, "9m"),
LEVEL_FOURTEEN(14, 600000L, "10m"),
LEVEL_FIFTEEN(15, 1200000L, "20m"),
LEVEL_SIXTEEN(16, 1800000L, "30m"),
LEVEL_SEVENTEEN(17, 3600000L, "1h"),
LEVEL_EIGHTEEN(18, 7200000L, "2h");
}

长延迟的支持(0.17/0.18/1.0最新版本)

mq不支持的长延迟任务,会默认持久化到 base__async_job 中,后续通过 elasticjob 将任务重新扫描扫描出来进行投递, 所以要支持长延迟任务得开启 elasticjob, 具体参考 [定时任务使用elasticjob](#切换到elasticjob调度(since 0.17.37.RELEASE))

长延迟任务的数据参考:

select * from base__async_job;

以下对相关字段做具体介绍:

public class AsyncJob extends BaseModel<Long> {
@Field(name = "唯一Key")
private String key;
@Field(name = "任务所属业务域")
private String moduleKey;
@Field(name = "异步任务名称")
private String name;
/**
* 绑定的逻辑流资源 LogicFlow Key
* @deprecated using {@link #functionKey}, 因 logicFlow 在 1.0 中定义更偏在线编排
*/
@Deprecated
private String logicFlowKey;
/**
* 逻辑流或逻辑函数的接口key, 必填
*/
private String functionKey;
/**
* 逻辑资源类型, 只能使用flow或者func, 必填, 默认为逻辑流(兼容旧的情况)
*/
private FunctionType functionType = FunctionType.LogicFlow;
/**
* 触发延迟, 单位是 秒
*/
@Field(name = "延迟时间(秒)")
private Long delay;
/**
* 期望投递时间
*/
@Field(name = "期望时间")
private Date pushAt;
/**
* 实际投递实际那
*/
@Field(name = "投递时间")
private Date pushedAt;
/**
* 异步任务的参数, 会被传入到 LogicFlow 里, 作为入参
* 其中 Map 的 Key 是 Flow 的参数名, value 是对应的参数
*/
@Field(name = "任务参数", type = FieldType.Json)
private Map<String, Object> data;
/**
* 投递成功之后记录的消息id, 根据msgId可以去消息的控制台来排查消息没有消费的问题
*/
@Field(name = "消息id")
private String msgId;
@Field(name = "是否已投递", defaultValue = "false", nullable = false)
private Boolean pushed = false;
/**
* 记录投递失败的重试次数, 避免失败过多一直死循环,目前尝试重新投递的次数为7
*/
@Field(name = "重试次数", defaultValue = "0", nullable = false)
private Integer retryTime = 0;
}

一次性job取消(0.17.32已废弃)

注意: 一次性job 从 redisson 调度换成了 mq 调度, 取消的意义是不大的,所以这个接口在后续版本中会废弃。

// 取消一次性 Job
Job.cancelDisposableJob(disposableJobKey);

手动运维接口(0.17.32已废弃)

用于在一次性job执行失败的时候,通过运维接口手动重新触发job执行。

Post 请求 ${meta-store-slave}/api/internal/job/devops/disposable/retry

body 为 json 的jobKey, 如 ["testJobKey"]

动态job

动态job触发

动态 Job 和一次性 Job 调用方式类似, 只是动态 Job 有三个子类型, 分别是 Cron, FixedDelay 和 FixedRate, 和固定声明式的类型一致.

具体调用方式就不在复述, 可以通过 Job 类的静态方法直接触发:

另外注意给 flow/func 传参的话, map 中的 key 是 flow/func 的参数名, value 是传入值

// 动态创建 Cron 表达式类型的动态 Job
Job.createCronDynamicJob(..);
// 动态创建固定延迟类型的动态 Job
Job.createFixedDelayDynamicJob(..);
// 动态创建固定间隔类型的动态 Job
Job.createFixedRateDynamicJob(..);

动态job取消

除了动态创建 Job 之外, Trantor 还提供了动态取消调度的 API.

// 取消动态 Job
Job.cancelDynamicJob(dynamicJobKey);