其他分享
首页 > 其他分享> > xxl-job的适用场景

xxl-job的适用场景

作者:互联网

前言

最近在设计一个项目,项目里面涉及到了任务创建和任务运行,这个就让我想到做一个单独的执行器服务。按照以往的经验,项目里的数据量也不会很高,那么任务的创建运行实际上单台机器就能应付,好像也没必要硬上分布式执行器吧。但是呢,虽然以往的经验如此,万一这个项目就“运气”好的爆表数据量很大呢,那后面改造就挺费事儿了,而且就算前期数据量一般,那么顶多也就是部署单节点咯。

平时上网划水也听说过xxl-job这个分布式任务调度平台,感觉也挺适合我这个项目的,所以就打算直接用这个了。clone了它的代码,然后按照教程学习了操作,之后就基本决定用它了。既然是为项目设计架构,好像我也没道理不去更深入了解下xxl-job的一些实现原理了吧,因此在学习完操作后我就开始看xxl-job的源码了。

在看xxl-job的源码时(主要是执行器部分),发现到了两个东西,一个是bug一个是设计逻辑,然后我从设计逻辑倒推了下执行器的适用场景。

 

适用场景

先说结论:个人认为xxl-job比较适合运行耗时比较长的大任务(定时大任务的估计也差不多),或者数量少耗时短的微任务;对于那种数量多或是运行时间较短的微任务可能不太适合(我的项目正好是后面这种...)。

再具体点,可以细分为两类:

以上结论是我的个人观点,如有错误请多指正。

 

说完结论,我来分析下我得出结论的依据。

首先来看ExecutorBizImpl.java的实现,这是执行器端的具体实现,.run()方法就是在接收到admin端调度任务后执行器端执行的方法。

我们看该方法的第一行,这里首先就根据jobId来获取指定任务(job)所在的执行线程(JobThread,继承自Thread)。从这里我们可以猜测,相同jobId的任务最终在单个执行器服务里面是只被分配了一个线程的。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());

        // 忽略后面的...  
}

接着我们看到该方法的最后几行,首先会根据jobThread来判断jobId指定的任务是否“正在”运行中,如果是的那就复用线程,如果不是那就通过.registJobThread()新建一个JobThread实例。之后把运行参数(triggerParam)给放到了jobThread实例的队列里(JobThread会不断从内置队列中获取运行参数来执行任务)。这几行代码也验证了我上面猜测的,那也就是说相同jobId的任务在并发执行时是会通过Queue作为中介排队运行的。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// 忽略前面的... // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } }

所以以上也就是我为什么说xxl-job执行器不适合执行jobId相同且数量多耗时长的任务了,因为后面来的任务都在Queue中排队了。

至于我为什么一直说不适合耗时短的微任务呢?首先我们先了解一个前提,那就是JobThread在不执行任务90秒后(也就是有idle timeout)会被回收掉。所以假设jobId不同,那么jobId越多就势必会创建大量线程,如果jobId不多,那么就有频繁创建和销毁线程的开销的可能了,这个好像就有点浪费资源;再假设jobId相同,如果微任务很多,那么排队执行相对来说还是比较慢的,如果微任务不多,那还是有浪费资源的可能。

 

除了上述ExecutorBizImpl.java:.run()开头和结尾的代码,它中间还有些其它逻辑,我选择一部分出来解释下。

比如这个是在判断任务job(也就是这个JobHandler)有没有在运行时被更新过,如果有,那么就要把运行中的旧job给停掉,然后运行新job。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
        // 忽略前面的...

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        }

        // 忽略后面的...  
}

 这里这个是job的阻塞策略,在创建任务时可选,阻塞策略其实是跟前文讲的相同jobId执行要排队有关的。当BlockStrategy是SERIAL_EXECUTION时,那么相同jobId的任务就是排队执行;当是DISCARD_LATER时,那么就抛弃后面来的job;当是COVER_EARLY时,那么就停止之前的job,开始运行新来的这个job。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
        // 忽略前面的...

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // 忽略后面的...
}

那么怎么解决不适合执行大量相同jobId任务的这个问题呢?我的思路就在于增加一种阻塞模式:单机并行(xxl-job应该也考虑过,不过没有实现,可见ExecutorBlockStrategyEnum.java中注释掉的类型)。当属于单机并行的任务在创建线程时,实际是去创建一个线程池,这样数量较多的相同jobId的任务就能并发执行了。然后这里打个广告,贴出我的魔改线程池版xxl-job的项目,跪求star

标签:场景,任务,jobId,jobThread,job,triggerParam,线程,xxl
来源: https://www.cnblogs.com/kumu/p/14671200.html