XXL-JOB 分布式任务调度平台调研使用分析

对于XXL-JOB 的使用和功能介绍在其文档中非常清晰,且非常容易上手,本篇文章是把官方文档众多内容简单梳理后而成,目的在于尽可能简单清晰用通俗化的语言把XXL-JOB 工具介绍给一个小白用户。

什么是分布式任务调度

在日常开发中,可能会碰到类似「每天定时推送一些业务数据通知到用户」的场景,这时能想到最简单的解决方案时用JDK定时任务的Timer。但随业务发展,单 体应用不仅有挂掉的风险,且不一定能Hold大量的业务数据。这时最简单的方法就是加机器(横向扩展),这也带来了很多问题。

  • 分布式:怎么保证任务正确的执行:比如5台机器,同一时间只要1台机器执行
  • 故障处理:机器在执行任务的时候挂掉了怎么办?
  • 任务管理:如果有几千个任务该如何管理?
  • 日志监控:怎么知道任务的执行情况?
    等等多个问题。
    而分布式调度平台的产生就是去解决以上问题。目前比较知名的几个分布式任务调度平台有:Quartz、ElasticJob、LTS(Light Task Scheduler)、XXL-JOB.

    XXL-JOB 简介

    XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
    该工具的开发者是来自美团点评的许雪里,XXL是作者名的缩写而成。

    亮点

    文档介绍几十个特性,这里我摘选几个比较重要的亮点特性:
  • 中心化架构:和ElasticJob和LTS去中心化架构不同,XXL-JOB是中心化架构,由调度中心和执行器组成。
  • 后台管理WEB:管理页面可以管理执行器、CRUD任务、查看调度日志和任务日志
  • Rolling实时日志:支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志;
  • GLUE:提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。
  • 任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔;
  • 路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;
  • 全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行;

    运行部署的几个步骤

    文档中的「快速入门」已经非常具体了。大概分为
  1. 初始化调度数据库
  2. 部署调度中心集群:xxl-job-admin
  3. 部署执行器集群:xxl-job-core。可以直接使用,也可以将现有项目改造为执行器,只要引入xxl-job-core的Maven依赖即可。
  4. 登录调度中心访问地址,管理配置。
    注意:本篇运行部署的版本是v2.12

    架构

    -w450
    从架构图上理解,XXL-JOB分为两个部分,同时也对应两个项目。
  • 调度中心(xxl-job-admin)
    • 执行器管理
    • 任务管理
    • 日志管理
    • 其他
  • 执行器(xxl-job-core)
    • 主动注册
    • 任务执行
    • 任务回调
    • 日志服务
      同时,从架构图也可以看出来基本的工作执行流程如下:
      -w450
  1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心
  2. 达到任务触发条件,调度中心下发任务
  3. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
  4. 执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心
  5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

    统一术语

  • 调度中心:负责管理、调度、监控任务的全周期。可以理解为所有执行器的老大,所有的执行器都有老大调遣;
  • 执行器:可以理解为具体干活的一类小弟,小弟要主动去认老大(自动注册),老大发出指令后执行(任务执行),执行后反馈任务执行结果(回调线程),同时也提供给老大查询任务执行过程记录的能力(日志服务);
  • 任务:一件事情,由一个或一组执行器执行;
  • 调度:老大交给小弟办一件事的具体描述;
    -w450

注册方式:自动注册和手动录入

在调度中心的执行器管理页面,提供了两种执行器注册方式。
-w450
这里需要明白4个字段的含义:

  • 执行器:这里的执行器实际指的是「执行器集群」的概念,并非单个机器节点,同时这也是执行任务的基本单位,也就是说调度中心并不能「直接」指定某台机器去执行任务,但可以「间接」通过定义任务路由的方式实现;
  • AppName:「执行器集群」的唯一标志,每个执行器机器集群的唯一标示, 任务注册以 “执行器” 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
  • 注册方式:分为自动和手动,如果是自动就不用填机器地址,同时执行器配置要必填调度中心地址和AppName用于主动注册;如果是手动,需要填写机器地址;
  • 机器地址:具体执行任务的机器节点。一个AppName维护着包含多个机器节点的列表;
    无论是自动注册还是手动注册,都需要用户在执行器管理页面声明一个执行器,这里我们着重说一下自动注册的配置和流程。

    主动注册配置说明:

  • 首先需要在执行器管理页面新建一个注册方式为「自动注册」的执行器;
  • 部署执行器时需要填写xxl.job.admin.addressesxxl.job.executor.appname两个配置字段表明开启自动注册,这两个字段时选填的,如果为空则表明关闭自动注册;

    执行器注册和摘除流程说明:

    任务注册的心跳周期Beat默认为30s。
  • 执行器节点启动之后会以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现;
  • “执行器” 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; “调度中心” 从而可以动态感知每个AppName在线的机器列表,具体信息见xxl_job_registry表。

-w450

  • 执行器摘除分为主动摘除和过期摘除:
    • 过期摘除:注册信息的失效时间为三倍Beat,也就是90s,执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;
    • 主动摘除:在WEB页面主动删除执行器。

      新建一个任务

      新建好执行器后,接下来就是定义一个任务分配给执行器执行。
      -w450

      运行模式:Bean模式和Glue模式

      如果把业务代码比作奔驰在路上的跑车,如果我们要给轮胎加一个零件,有两种方案,一是把车停下来把零件加上去后再启动,这就是Bean模式,二是不停车,车在跑的同时直接在论坛上把零件「粘」上去,这也是Glue(中文含义就是胶水)的意义来源。
      两者区别:
  • Bean模式的业务代码保存在执行器,Glue模式的业务代码保存在调度中心,运行在执行器;
  • Bean模式更新代码需呀停机,Glue模式支持通过Web IDE在线更新,实时编译和生效;
  • Glue模式提供版本回滚功能,Glue模式每次执行前都要判断为最新版本,否则需要冲洗构造任务线程;
  • Glue模式支持跨语言,Shell、Python、Nodejs

-w450
Glue Java 模式是通过 GLUE Class loader 加载源码的方式,加载源码可以注入 spring 当中其他的一些 server 组件,很方便的接触 spring 其他服务。你修改的时候,下次任务执行,会是你当前这份源码重新实例化,注入一些新的服务进行执行。

开发一个JOB

类级别:在XXL-JOB中,所以的JOB都需要继承IJobHandler并重写execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class IJobHandler {


/** success */
public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null);
/** fail */
public static final ReturnT<String> FAIL = new ReturnT<String>(500, null);
/** fail timeout */
public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null);


/**
* execute handler, invoked when executor receives a scheduling request
*
* @param param
* @return
* @throws Exception
*/
public abstract ReturnT<String> execute(String param) throws Exception;


/**
* init handler, invoked when JobThread init
*/
public void init() throws InvocationTargetException, IllegalAccessException {
// do something
}


/**
* destroy handler, invoked when JobThread destroy
*/
public void destroy() throws InvocationTargetException, IllegalAccessException {
// do something
}


}

方法级别:前提是Spring容器,直接在方法上添加@XxlJob("demoJobHandler")就行。
无论是类级别还是方法级别,执行方式格式要求为 public ReturnT<String> execute(String param)

官方提供的几种示例任务

demoJobHandler:简单示例任务,任务内部模拟耗时任务逻辑,用户可在线体验Rolling Log等功能;

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World."+param);

for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}

shardingJobHandler:分片示例任务,任务内部模拟处理分片参数,可参考熟悉分片任务;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {

// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());

// 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}

return ReturnT.SUCCESS;
}

httpJobHandler:通用HTTP任务Handler;业务方只需要提供HTTP链接即可,不限制语言、平台;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 4、跨平台Http任务
*/
@XxlJob("httpJobHandler")
public ReturnT<String> httpJobHandler(String param) throws Exception {

// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(param);
connection = (HttpURLConnection) realUrl.openConnection();

// connection setting
connection.setRequestMethod("GET");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

// do connection
connection.connect();

//Map<String, List<String>> map = connection.getHeaderFields();

// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}

// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();

XxlJobLogger.log(responseMsg);
return ReturnT.SUCCESS;
} catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobLogger.log(e2);
}
}

}

commandJobHandler:通用命令行任务Handler;业务方只需要提供命令行即可;如 “pwd”命令;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public ReturnT<String> commandJobHandler(String param) throws Exception {
String command = param;
int exitValue = -1;

BufferedReader bufferedReader = null;
try {
// command process
Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobLogger.log(line);
}

// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobLogger.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}

if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
}
}

路由策略

当执行器集群部署时,提供丰富的路由策略,包括;

  • FIRST(第一个):固定选择第一个机器;
  • LAST(最后一个):固定选择最后一个机器;
  • ROUND(轮询)
  • RANDOM(随机):随机选择在线的机器;
  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  • LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
  • FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  • SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

-w450
从任务触发到任务执行在调度中心阶段有几个步骤

  • 匹配执行器
  • 从注册中心加载在线机器节点列表
  • 根据路由策略去匹配一组机器
  • 通知这个机器去执行任务
    基本前面几个路由策略比较容易理解,这里我们着重说一下后面三个路由策略。

    故障转移和忙碌转移

    -w450
    调度器在调度时会请求执行器是否处于处于故障或者忙碌状态,对应着ExecutorBiz的开放出来的两个方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ExecutorBiz {

/**
* beat
* @return
*/
public ReturnT<String> beat();

/**
* idle beat
*
* @param jobId
* @return
*/
public ReturnT<String> idleBeat(int jobId);


}

分片广播

有时候一台机器处理不够,需要多台机器分片处理。
-w450
可以参照上面的分片广播任务查看如何使用。

阻塞处理策略

-w450

当我们有一些耗时任务,触发的频率超过它的执行器所执行的那些速度的时候,如上图,红色的触发请求进来,但是前面的还在堆积着执行,这时候怎么办?第一条就是默认的单机串行,会把请求入队列,等前面的执行完了之后,挨个把所有的触发的任务全都执行掉。第二个就是丢弃后续的调度,红色的进来了,发现前面已经有了,或者是当前已经 JOB 运行了,直接把后面的标记失败,不进行后面的执行了。最后一个就是覆盖之前调度的,它发现前面队列里面的数据或者任务执行的情况下,把队列清空,把清空的数据全都标记失败,然后把执行的 JOB 也标记失败,让自己来运行。

触发规则

现在提供的触发规则主要有 3 种。

  • 第一种是 Cron 表达式,每一个任务需要配一个 Cron 表达式。
  • 第二种是任务依赖,你可以为每一个任务配置一个子任务,当副任务执行完成之后,可以触发子任务,这样关联的方式进行触发执行。
  • 第三种就是事件触发,其实就是类似于 Mq 的场景,代码里面有一个业务逻辑,触发了一个任务执行。

    任务状态

  • 成功(SUCCESS)
  • 失败 (FAIL)
  • 超时 (FAIL_TIMEOUT)
  • 进行中

    调度日志和任务日志

    调度日志:调度平台调度任务的日志,如果任务失败会保留出失败的日志。
    -w450
    -w450
    -w450

任务日志:具体任务执行的日志,下面那张图是一个日志页面展示(貌似是读不到日志),真实应该展示的应该是下面的日志内容。
-w450

1
2
3
4
5
6
7
2020-02-05 23:29:54 [com.xxl.job.core.thread.JobThread#run]-[124]-[Thread-6] <br>----------- xxl-job job execute start -----------<br>----------- Param:
2020-02-05 23:29:54 [com.xuxueli.executor.sample.frameless.jobhandler.ShardingJobHandler#execute]-[20]-[Thread-6] 分片参数:当前分片序号 = 0, 总分片数 = 2
2020-02-05 23:29:54 [com.xuxueli.executor.sample.frameless.jobhandler.ShardingJobHandler#execute]-[25]-[Thread-6] 第 0 片, 命中分片开始处理
2020-02-05 23:29:54 [com.xuxueli.executor.sample.frameless.jobhandler.ShardingJobHandler#execute]-[27]-[Thread-6] 第 1 片, 忽略
2020-02-05 23:29:54 [com.xxl.job.core.thread.JobThread#run]-[164]-[Thread-6] <br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:ReturnT [code=200, msg=null, content=null]
2020-02-05 23:29:55 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[190]-[xxl-job, executor TriggerCallbackThread] <br>----------- xxl-job job callback finish.

用户管理

-w450
两种角色:

  • 管理员:拥有所有的执行器的权限
  • 普通用户:由管理分配指定的执行器的权限

    目前发现的几个小问题

    报表页面统计简单

    运行报表页面只有任务数量、调度次数、执行器数量的统计,无法根据不同执行器展现特定执行器的数据报表。

    不同用户的报表页面数据没有隔离

    -w450

-w450
对于allen用户,报表页面的数据和管理员登录后所展示的数据一样。
但执行器管理和任务管理以及调度日志,是做了数据隔离的。

手动输入执行器总显示Online

-w450
对于手动录入的执行器,总显示Online。
这样的好处是,用户可以手动控制上线的机器,只要修改机器列表即可。

是否可以覆盖需求

-w450

  1. 是否可以指定节点运行?
    A:XXL-JOB运行的基本单位是一个执行器集群,但是可以通过指定不同的路由策略达到指定机器节点的目的。
  2. JOB可以被高频调用
    A:参照压测报告
    XXLJOB有过持续20小时,80W次调度100%成功率的测试报告。
  3. 支持大数据量的任务调度
    A:XXL-JOB支持分片广播,是支持大数据量的任务调度的,数据量的多少应该取决于执行器集群的数量吧
  4. 调度策略简单明了?
    A:简单明了
  5. 提供API
    API接口
    提供了调度中心和执行器的API。