其他分享
首页 > 其他分享> > 分布式任务调度平台SIA-TASK的架构设计与运行流程

分布式任务调度平台SIA-TASK的架构设计与运行流程

作者:互联网

一、分布式任务调度的背景

无论是互联网应用或者企业级应用,都充斥着大量的批处理任务。我们常常需要一些任务调度系统来帮助解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布式、微服务架构。在此背景下,很多原先的任务调度平台已经不能满足业务系统的需求,于是出现了一些基于分布式的任务调度平台。

1.1 分布式任务调度的演进

在实际业务开发过程中,很多时候我们无可避免地需要使用一些定时任务来解决问题。通常我们会有多种解决方案:使用 Crontab 或 SpringCron (当然这种情况可能机器很少而且任务简单又不是很多的情况下)。然而,当应用复杂度升高、定时任务数量增多且任务之间产生依赖关系时,Crontab 进行定时任务的管理配置就会非常混乱,严重影响工作效率。这时就会产生一系列问题:

随着互联网的发展,分布式服务架构势越来越流行。相应的也需要一个分布式任务调度系统来管理分布式架构中的定时任务。

1.2 分布式任务调度架构

分布式任务调度设计

当垂直应用越来越多,应用之间交互也会越来越复杂,通常我们采用分布式或者微服务架构,将核心业务抽取出来,形成单独的服务。一个独立的微服务群体逐渐形成稳定的服务中心,使得业务应用能更快地响应多变的市场需求。

此时,用于提高业务复用及整合的分布式服务框架成为关键。同时,由于服务独立,一般能做到定时任务独立的情况,任务的更改对于整体系统的影响小之又小。通常我们会采用任务与调度分离的方式(如上图所示),任务的执行逻辑无需关注调度与编排,同时可以保证执行器和调度的高可用,易于开发和维护。

1.3 分布式任务调度优势

在分布式服务架构的基础上,由于独立业务的数量可能很多,此时如果定时任务单独在该服务中实现,很可能会出现难以管理的情况,且避免不了由于定时任务的更改而导致的业务重启。因此,一个独立的分布式任务调度系统是很必要的,可以用来全局统筹管理所有的定时任务。同时,将任务的配置单独抽离出来,作为该分布式任务调度系统的功能,就能做到定时任务的更改不影响任何业务,也不影响整个系统:

二、分布式任务调度技术选型

2.1 分布式任务调度考虑因素

sia-task-设计图

2.2 SIA-TASK与其它分布式任务调度技术比较

SIA是宜信公司基础开发平台Simple is Awesome的简称,SIA-TASK(微服务任务调度平台)是其中的一项重要产品,SIA-TASK契合当前微服务架构模式,具有跨平台、可编排、高可用、无侵入、一致性、异步并行、动态扩展、实时监控等特点。

开源地址:https://github.com/siaorg/sia-task

我们先对比市场上主流的开源分布式任务调度框架,分析其优缺点,然后再介绍我们的技术选型。

下面我们简单对比下 SIA-TASK 与这些任务调度框架:

 任务编排任务分片跨平台高可用故障转移实时监控
SIA-TASK
Quartz × × .NET × API监控
TBSchedule × ×
Elastic-Job × ×
Saturn ×
Antares ×
Uncode-Schedule × × ×
XXL-JOB 子任务依赖 ×

可以发现,这些调度框架基本上都支持高可用、故障转移与实时监控等功能,但是对于任务编排、任务分片与跨平台等功能的支持各有侧重点。SIA-TASK 将全面支持这些功能。

三、SIA-TASK介绍

3.1 SIA-TASK技术选型

sia-task-technology

3.2 SIA-TASK设计思想

SIA-TASK借鉴微服务设计思想,获取分布在每个执行器节点上的任务(Task)元数据,进行汇报,上传注册中心。利用在线可编辑方式支持任务在线编排、动态修改任务时钟;使用 Http 协议作为交互传输协议。数据交互格式统一使用Json。用户通过编排器(下文会做介绍)进行操作,触发事件,调度器接收事件,由调度中心进行时钟解析,执行任务流程,进行任务通知。

3.3 SIA-TASK基本概念

SIA-TASK 采用任务和调度分离的方式,业务的执行任务逻辑和调度逻辑完全分离。系统组成共涉及以下几个核心概念:

3.4 SIA-TASK系统架构

SIA-TASK 可以分为三大模块(调度中心、编排中心和执行器)、两大组件(持久化存储和注册中心)。这三大模块和两大组件的作用如下:

SIA-TASK 使用 SpringBoot 体系作为架构选型,基于Quartz及Zookeeper进行二次开发,支持相应的特性功能,SIA-TASK 的逻辑架构图如下图所示:

逻辑架构图

3.5 SIA-TASK模块说明

3.5.1 任务调度中心

任务调度中心负责任务调度,管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;支持可视化、简单且动态地管理调度信息,包括任务新建,更新,删除和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器故障恢复。

3.5.2 任务编排中心

任务编排中心是分布式调度中心支持在线任务模型编排的组件;依托于UI可进行web端任务编排。

我们可以通过上述基础模型来编排一些复杂的调度模型,例如:

调度模型

SIA-TASK的UI编排界面:

UI编排界面

编排结束后查看task的编排信息如下图所示:

编排信息

同时,编排中心还提供首页统计数据查看、调度监控、Job管理、Task管理以及日志管理功能。

3.5.3 任务执行器

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;

执行器支持两种类型:

(1) 如果使用 sia-task-hunter,支持SpringBoot项目和Spring项目, 引入 sia-task-hunter,任务(Task)抓取客户端。合规的HTTP接口(称之为Task)任务会自动被抓取并上传注册中心;

(2) 如果不使用 sia-task-hunter,只需提供任务可调用的HTTP接口,此时需要业务手动录入,且自行控制该任务的并发调用控制。

3.5.4 任务注册中心(Zookeeper)

分布式框架采用Zookeeper作为注册中心。

注册中心

(1) 任务注册

调度中心和执行集群都以Zookeeper作为注册中心,所有数据以节点及节点内容的形式注册,通过定时汇报主机状态保持存活在Zookeeper上。

(2) 元数据存储

注册中心不仅仅提供注册服务,并且存储每个执行器的信息(包括执行器实例信息,执行器上传的Task元数据,以及任务运行时的一些临时状态数据)。

(3) 事件发布

基于Zookeeper事件推送机制,进行任务的发布,通过平衡算法保证调度器任务抢占的分布均衡。

(4) 负载均衡

保证调度器获取执行Job的个数均衡,避免单一节点压力。

3.5.5 持久化存储(DB)

这里采用MySQL作为数据持久化解决方案。

除了Task动态元数据保存在注册中心之外,其他相关的元数据都存入MySQL,包括但不限于:手动录入的Task、配置的Job信息、编排的Task依赖信息、调度日志、业务人员操作日志、Task执行日志等。

3.6 SIA-TASK关键运行流程

3.6.1 任务发布流程

任务发布流程

(1) 用户可以通过UI进行Job创建。可以选择Job类型,设置预警邮箱,设置Job描述。然后为创建的Job进行任务Task编排。

(2) Job创建完毕并且设置Task编排关系后可进行任务发布,通过UI对相应的Job进行操作(激活,执行一次,停止以及删除操作)。

(3) 用户的Task任务可以是通过抓取器抓取的,亦可以使用UI手动创建。

  从图中可以看到,ConcurrentHashMap离不开Segment,Segment是ConcurrentHashMap的一个静态内部类,可以看到Segment继承了重入锁ReentrantLock,要想访问Segment片段,线程必须获得同步锁,结构如下:
  
  static final class Segment<K,V> extends ReentrantLock implements Serializable {
  
  //尝试获取锁的最多尝试次数,即自旋次数
  
  static final int MAX_SCAN_RETRIES =
  
  Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
  
  //HashEntry数组,也就是键值对数组
  
  transient volatile HashEntry<K, V>[] table;
  
  //元素的个数
  
  transient int count;
  
  //segment中发生改变元素的操作的次数,如put/remove
  
  transient int modCount;
  
  //当table大小超过阈值时,对table进行扩容,值为capacity *loadFactor
  
  transient int threshold;
  
  //加载因子
  
  final float loadFactor;
  
  Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
  
  this.loadFactor = lf;
  
  this.threshold = threshold;
  
  this.table = tab;
  
  }
  
  }
  
    键值对HashEntry是ConcurrentHashMap的基本数据结构,多个HashEntry可以形成链表用于解决hash冲突。
  
  static final class HashEntry<K,V> {
  
  //hash值
  
  final int hash;
  
  //键
  
  final K key;
  
  //值
  
  volatile V value;
  
  //下一个键值对
  
  volatile HashEntry<K, V> next;
  
  HashEntry(int hash, K key, V value, HashEntry<K, V> next) {
  
  this.hash = hash;
  
  this.key = key;
  
  this.value = value;
  
  this.next = next;
  
  }
  
  }
  
    ConcurrentHashMap成员变量和构造方法如下:
  
  public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
  
  implements ConcurrentMap<K, V>, Serializable {
  
  private static final long serialVersionUID = 7249069246763182397L;
  
  //默认的初始容量
  
  static final int DEFAULT_INITIAL_CAPACITY = 16;
  
  //默认加载因子
  
  static final float DEFAULT_LOAD_FACTOR = 0.75f;
  
  //默认的并发度,也就是默认的Segment数组长度
  
  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  
  //最大容量,ConcurrentMap最大容量
  
  static final int MAXIMUM_CAPACITY = 1 << 30;
  
  //每个segment中table数组的长度,必须是2^n,最小为2
  
  static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
  
  //允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
  
  static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
  
  //非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
  
  static final int RETRIES_BEFORE_LOCK = 2;
  
  //计算segment位置的掩码值
  
  final int segmentMask;
  
  //用于计算算segment位置时,hash参与运算的位数
  
  final int segmentShift;
  
  //Segment数组
  
  final Segment<K,V>[] segments;
  
  public ConcurrentHashMap(int initialCapacity,
  
  float loadFactor, int concurrencyLevel) {
  
  //参数校验
  
  if (!(loadFactor > 0) || initialCapacity <www.yuntianyuL.cn 0 || concurrencyLevel <= 0)
  
  throw new IllegalArgumentException();
  
  if (concurrencyLevel > MAX_SEGMENTS)
  
  concurrencyLevel = MAX_SEGMENTS;
  
  // Find power-of-two sizes best matching arguments
  
  //找到一个大于等于传入的concurrencyLevel的2^n数,且与concurrencyLevel最接近
  
  //ssize作为Segment数组
  
  int sshift = 0;
  
  int ssize = 1;
  
  while (ssize <www.yuntianyul.com concurrencyLevel) {
  
  ++sshift;
  
  ssize <<= 1;
  
  }
  
  this.segmentShift = 32 - sshift;
  
  this.segmentMask = ssize - 1;
  
  if (initialCapacity > MAXIMUM_CAPACITY)
  
  initialCapacity = MAXIMUM_CAPACITY;
  
  // 计算每个segment中table的容量
  
  int c = initialCapacity / ssize;
  
  if (c * ssize < initialCapacity)
  
  ++c;
  
  int cap = MIN_SEGMENT_TABLE_CAPACITY;
  
  // 确保cap是2^n
  
  while (cap < c)
  
  cap <<= 1;
  
  // create segments and segments[0]
  
  // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
  
  Segment<K,V> s0 =
  
  new Segment<K,V>(loadFactor, (www.qilinchengdl.cn int)(cap * loadFactor),
  
  (HashEntry<K,V>[])new HashEntry[cap]);
  
  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  
  UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  
  this.segments = ss;
  
  }
  
  }
  
  concurrencyLevel 参数表示期望并发的修改 ConcurrentHashMap 的线程数量,用于决定 Segment 的数量,通过算法可以知道就是找到最接近传入的concurrencyLevel的2的幂次方。而segmentMask 和 segmentShift看上去有点难以理解,作用主要是根据key的hash值做计算定位在哪个Segment片段。
  
  对于哈希表而言,最重要的方法就是put和get了,下面分别来分析这两个方法的实现:
  
  put(K key, V value)
  
    put方法实际上只有两步:1.根据键的值定位键值对在那个segment片段 2.调用Segment的put方法
  
  public V put(K key, V value) {
  
  Segment<K,V> s;
  
  if (value ==www.cmylli.com null)
  
  throw new NullPointerException();
  
  //计算键的hash值
  
  int hash = hash(key);
  
  //通过hash值运算把键值对定位到segment[j]片段上
  
  int j = (hash >>> segmentShift) & segmentMask;
  
  //检查segment[j]是否已经初始化了,没有的话调用ensureSegment初始化segment[j]
  
  if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  
  (segments, (j <www.chengmyuLegw.cn< SSHIFT) + SBASE)) == null) // in ensureSegment
  
  s = ensureSegment(j);
  
  //向片段中插入键值对
  
  return s.put(key, hash, value, false);
  
  }
  
  ensureSegment(int k)
  
    我们从ConcurrentHashMap的构造函数可以发现Segment数组只初始化了Segment[0],其余的Segment是用到了在初始化,用了延迟加载的策略,而延迟加载调用的就是ensureSegment方法
  
  private Segment<K,V> ensureSegment(int k) {
  
  final Segment<K,V>[] ss = this.segments;
  
  long u = (k << SSHIFT) + SBASE; // raw offset
  
  Segment<K,V> seg;
  
  //按照segment[0]的HashEntry数组长度和加载因子初始化Segment[k]
  
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  
  Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  
  int cap = proto.table.length;
  
  float lf = proto.loadFactor;
  
  int threshold = (int)(cap www.cmylept.cn* lf);
  
  HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  
  == null) { // recheck
  
  Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  
  while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  
  == null) {
  
  if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  
  break;
  
  }
  
  }
  
  }
  
  return seg;
  
  }
  
  put(K key, int hash, V value, boolean onlyIfAbsent)
  
    调用Segment的put方法插入键值对到Segment的HashEntry数组
  
  final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  
  //Segment继承ReentrantLock,尝试获取独占锁
  
  HashEntry<K,V> node =www.ztyLegw.cn tryLock() ? null :
  
  scanAndLockForPut(key, hash, value);
  
  V oldValue;
  
  try {
  
  HashEntry<K,V>[] tab = table;
  
  //定位键值对在HashEntry数组上的位置
  
  int index = (tab.length - 1) & hash;
  
  //获取这个位置的第一个键值对
  
  HashEntry<K,V> first = entryAt(tab, index);
  
  for (HashEntry<K,V> e = first;;) {
  
  if (e != null) {//此处有链表结构,一直循环到e==null
  
  K k;
  
  //存在与待插入键值对相同的键,则替换value
  
  if ((k = e.key) == key ||
  
  (e.hash == hash && key.equals(k))) {
  
  oldValue = e.value;
  
  if (!onlyIfAbsent) {www.tcgjgw.com//onlyIfAbsent默认为false
  
  e.value = value;
  
  ++modCount;
  
  }
  
  break;
  
  }
  
  e = e.next;
  
  }
  
  else {
  
  //node不为null,设置node的next为first,node为当前链表的头节点
  
  if (node != null)
  
  node.setNext(first);
  
  //node为null,创建头节点,指定next为first,node为当前链表的头节点
  
  else
  
  node = new HashEntry<K,V>(hash, key, value, first);
  
  int c = count + 1;
  
  //扩容条件 (www.feironggw.cn)entry数量大于阈值 (2) 当前数组tab长度小于最大容量。满足以上条件就扩容
  
  if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  
  //扩容
  
  rehash(node);
  
  else
  
  //tab的index位置设置为node,
  
  setEntryAt(tab, index, node);
  
  ++modCount;
  
  count = c;
  
  oldValue = null;

3.6.2 执行流程

执行流程

(1) Job创建完成之后,可以选择激活触发定时任务;

(2) Job到达预订时间后,调度中心触发Job,然后按照预定的Task编排逻辑通过http通知Task执行器进行执行,并异步监听任务执行结果;

(3) 若执行结果成功,则判断是否存在后置Task,若存在,则继续下一次调度,若不存在,则说明该Job执行完毕,结束本次调用;若执行结果失败,则触发故障恢复策略:立即停止、忽略本次失败、多次尝试、转到其它执行器执行。

3.6.3 状态流转

Job在整个生命周期内存在四种状态,分别是:已停止(NULL)、准备中(READY)、开始运行(RUNNING)、异常停止(STOP),状态流转及流转条件如下图所示。

状态流转

3.7 SIA-TASK模块设计

SIA-TASK 的物理网络拓扑图如下所示:

网络拓扑图

SIA-TASK 的模块间交互设计思路:

(1) 通过编排中心创建Task任务或通过Hunter自动抓取,并将 Task 信息异步保存到DB;创建Job并激活,在zookeeper中创建JobKey。

(2) 调度中心会监听zookeeper中JobKey创建事件,然后抢占创建的Job,抢占成功后加入quartz定时任务,当时间到达即触发Job运行。调度中心异步调用执行器服务执行Job中的 Task (可能存在多个 Task ,遵循 Task 失败策略),并将结果返回到调度中心。

(3) 将Job执行状态随时在zookeeper上更改,通过编排中心的查询接口可以进行查询。

(4) Job执行结束后,等待下一次执行。

3.7.1 任务编排中心设计

编排中心可以与DB和zookeeper进行数据交互,其主要功能可分为三方面:

编排中心首页监控展示如下:

首页监控

3.7.2 任务调度中心设计

调度中心主要与DB、ZK和执行器进行交互,其主要功能可分为以下几个方面:

3.7.3 任务执行器设计

执行器可以与ZK和调度中心进行交互,其主要功能可分为两个方面:

执行器 Task示例:

@OnlineTask(description = "在线任务示例",enableSerial=true)
@RequestMapping(value = "/example", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@CrossOrigin(methods = { RequestMethod.POST }, origins = "*")
@ResponseBody
public String example(@RequestBody String json) {   
    /**
     * TODO:客户端业务逻辑处理
     */
    Map<String, String> info = new HashMap<String, String>();
    info.put("status", "success");
    info.put("result", "as you need");
    return JSONHelper.toString(info);
}

由此可见,任务 Task 编写非常简单。

3.8 SIA-TASK高可用设计

分布式服务一般都要考虑高可用方案,同样 SIA-TASK 为了保证高可用,针对不同的服务组件进行了不同维度增强。

3.8.1 任务编排中心的高可用

SIA-TASK 通过前后端分离、服务拆分等措施实现了编排中心的高可用。当集群中某实例失效后,不会影响集群的其它实例,因此无需特殊操作即可使用集群中其它的可用编排中心。

3.8.2 任务调度中心的高可用

3.8.2.1 异常转移

如果调度中心集群中的某个实例节点服务宕机后,这个实例节点上的所有Job会平滑迁移到集群中可用的实例上,不会造成定时任务的执行缺失,同时,当崩溃后的实例修复成功重新接入该集群时,会继续抢占Job提供服务。

3.8.2.2 配置线程池

调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。程池里的线程数,默认值是10,当执行任务会并发执行多个耗时任务时,要根据业务特点选择线程池的大小。

org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 60
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

SIA-TASK 根据quartz自身提供的threadPool再次进行线程池的利用。进行线程池重新定义,针对每个Job去分配一个独有的线程池。线程池的大小可根据Job自身编排的 Task 个数的大小进行动态伸缩,从而保证每个Job的调度线程完全独立,不在会因为编排 Task 个数的陡增而耗尽线程资源。同时提供线程池资源的回收逻辑,在Job进行永久性终止时回收为期分配的线程池资源。

public static ExecutorService getExecutorService(String JobKey) {

    ExecutorService exec = executorPool.get(JobKey);
     if (exec == null) {
        LOGGER.info(Constants.LOG_PREFIX + "Initialize thread pool for running Jobs,Job is {}",JobKey);
      exec = Executors.newCachedThreadPool();
      executorPool.putIfAbsent(JobKey, exec);
      exec = executorPool.get(JobKey);
  }
    return exec;
}
3.8.2.3 全日志跟踪

SIA-TASK 针对Job的整个调度生命周期进行全面跟踪,利用AOP进行日志增强,调度中心每触发一次Job调度就会进行日志记录。同时针对Job编排的 Task 执行也会进行记录任务日志。

日志分为Job日志和 Task 日志:

3.8.2.4 异步封装
public interface RestTemplate {

/**
 * 异步Post方法 * @param request
 * @param responseType
 * @param uriVariables
 * @param <T>
 * @return
 */
 <T> ListenableFuture<ResponseEntity<T>> postAsyncForEntity(Request request, Class<T> responseType, Object... uriVariables); }
3.8.2.5 自定义调度器资源池

调度器资源池

SIA-TASK 从物理资源角度设计了调度资源池,出于一些特殊情况的考量我们针对调度器进行了池化;调度器可以通过不同的操作进行状态的转变,从而进行能力的转化。

3.8.3 任务执行器的高可用

四、总结

至此对微服务任务调度平台 SIA-TASK 做了一个简要的介绍,包括设计背景、架构设计以及产品组件功能与特性。微服务任务调度平台 SIA-TASK 基本上解决了当前的业务需求,提供简单高效的编排调度服务。SIA-TASK 会持续迭代,提供更为完善的服务。之后也会提供相关技术文档和使用文档。

标签:架构设计,SIA,TASK,调度,任务,Task,Job,任务调度
来源: https://www.cnblogs.com/qwangxiao/p/10972288.html