编程语言
首页 > 编程语言> > 源码分析 | Spring定时任务Quartz执行全过程源码解读

源码分析 | Spring定时任务Quartz执行全过程源码解读

作者:互联网

微信公众号:bugstack虫洞栈

微信公众号:bugstack虫洞栈 | 博客:https://bugstack.cn

沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例[Ing]等。

你用剑????、我用刀????,好的代码都很烧????,望你不吝出招????!

一、前言介绍

在日常开发中经常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。因为有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;

  1. 默认初始化多少个任务线程
  2. JobStore有几种实现,你平时用的都是哪个
  3. 一个定时任务的执行流程简述下

蒙圈了吧,是不感觉平时只是使用了,根本没关注过这些。有种冲动赶紧搜索答案吧!但只是知道答案是没有多少意义的,扛不住问不说,也不了解原理。所以,如果你想真的提升自己技能,还是要从根本搞定。

二、案例工程

为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。工程下载,关注公众号:bugstack虫洞栈,回复:源码分析

itstack-demo-code-schedule
└── src
    ├── main
    │   ├── java
    │   │   └── org.itstack.demo
    │   │       ├── DemoTask.java
    │   │       └── JobImpl.java   
    │   └── resources	
    │       ├── props	
    │       │   └── config.properties
    │       ├── spring
    │       │   └── spring-config-schedule-task.xml
    │       ├── logback.xml
    │       └── spring-config.xml
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── ApiTest.java
                 ├── MyQuartz.java				
                 └── MyTask.java

三、环境配置

  1. JDK 1.8
  2. IDEA 2019.3.1
  3. Spring 4.3.24.RELEASE
  4. quartz 2.3.2 {不同版本略有代码差异}

四、源码分析

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

依赖于Spring版本升级quartz选择2.3.2,同时如果你如本文案例中所示使用xml配置任务。那么会有如下更改;

Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean

 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
     <property name="jobDetail" ref="taskHandler"/>
     <property name="cronExpression" value="0/5 * * * * ?"/>
 </bean>

Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean

 <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
     <property name="jobDetail" ref="taskHandler"/>
     <property name="cronExpression" value="0/5 * * * * ?"/>
 </bean>

在正式分析前,可以看下quartz的默认配置,很多初始化动作都要从这里取得参数,同样你可以配置自己的配置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。

quart.properties

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#

org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

1. 从一个简单案例开始

平时我们使用Schedule基本都是注解或者xml配置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。

DemoTask.java & 定义一个等待被执行的任务

public class DemoTask {

    private Logger logger = LoggerFactory.getLogger(DemoTask.class);

    public void execute() throws Exception{
        logger.info("定时处理用户信息任务:0/5 * * * * ?");
    }

}

MyTask.java & 测试类,将配置在xml中的代码抽离出来

public class MyTask {

    public static void main(String[] args) throws Exception {

        DemoTask demoTask = new DemoTask();

        // 定义了;执行的内容
        MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
        methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
        methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
        methodInvokingJobDetailFactoryBean.setConcurrent(true);
        methodInvokingJobDetailFactoryBean.setName("demoTask");
        methodInvokingJobDetailFactoryBean.afterPropertiesSet();

        // 定义了;执行的计划
        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
        cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
        cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
        cronTriggerFactoryBean.setName("demoTask");
        cronTriggerFactoryBean.afterPropertiesSet();

        // 实现了;执行的功能
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
        schedulerFactoryBean.setAutoStartup(true);
        schedulerFactoryBean.afterPropertiesSet();

        schedulerFactoryBean.start();

        // 暂停住
        System.in.read();

    }

}

如果一切顺利,那么会有如下结果:

2020-01-04 10:47:16.369 [main] INFO  org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor
2020-01-04 10:47:16.421 [main] INFO  org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2020-01-04 10:47:16.422 [main] INFO  org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.
2020-01-04 10:47:16.423 [main] INFO  org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.
2020-01-04 10:47:16.424 [main] INFO  org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

2020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.
2020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.2
2020-01-04 10:47:16.426 [main] INFO  org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010
2020-01-04 10:47:16.651 [main] INFO  org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.
一月 04, 2020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler
信息: Starting Quartz Scheduler now
2020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?
2020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO  org.itstack.demo.DemoTask[11] - 定时处理用户信息任务:0/5 * * * * ?

Process finished with exit code -1

2. 定义执行内容(MethodInvokingJobDetailFactoryBean)

// 定义了;执行的内容
MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
methodInvokingJobDetailFactoryBean.setConcurrent(true);
methodInvokingJobDetailFactoryBean.setName("demoTask");
methodInvokingJobDetailFactoryBean.afterPropertiesSet();

这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置必要信息;

最后我们通过手动调用 afterPropertiesSet() 来模拟初始化。如果我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类配置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean 接口,因为这个接口实现后就可以通过 T getObject() 获取自己自定义初始化的类。这也常常用在一些框架开发中。

MethodInvokingJobDetailFactoryBean.afterPropertiesSet()

public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
	prepare();
	// Use specific name if given, else fall back to bean name.
	String name = (this.name != null ? this.name : this.beanName);
	// Consider the concurrent flag to choose between stateful and stateless job.
	Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
	// Build JobDetail instance.
	JobDetailImpl jdi = new JobDetailImpl();
	jdi.setName(name);
	jdi.setGroup(this.group);
	jdi.setJobClass((Class) jobClass);
	jdi.setDurability(true);
	jdi.getJobDataMap().put("methodInvoker", this);
	this.jobDetail = jdi;
	
	postProcessJobDetail(this.jobDetail);
}

3. 定义执行计划(CronTriggerFactoryBeann)

// 定义了;执行的计划
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
cronTriggerFactoryBean.setName("demoTask");
cronTriggerFactoryBean.afterPropertiesSet();

这一块主要定义任务的执行计划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置必要信息;

CronTriggerFactoryBean.afterPropertiesSet()

@Override
public void afterPropertiesSet() throws ParseException {
    
	// ... 校验属性信息
	
	CronTriggerImpl cti = new CronTriggerImpl();
	cti.setName(this.name);
	cti.setGroup(this.group);
	if (this.jobDetail != null) {
		cti.setJobKey(this.jobDetail.getKey());
	}
	cti.setJobDataMap(this.jobDataMap);
	cti.setStartTime(this.startTime);
	cti.setCronExpression(this.cronExpression);
	cti.setTimeZone(this.timeZone);
	cti.setCalendarName(this.calendarName);
	cti.setPriority(this.priority);
	cti.setMisfireInstruction(this.misfireInstruction);
	cti.setDescription(this.description);
	this.cronTrigger = cti;
}

CronTriggerFactoryBean.getObject()

@Override
public CronTrigger getObject() {
	return this.cronTrigger;
}

4. 调度执行计划(SchedulerFactoryBean)

// 调度了;执行的计划(scheduler)
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.afterPropertiesSet();

schedulerFactoryBean.start();

这一部分如名字一样调度工厂,相当于一个指挥官,可以从全局做调度,比如监听哪些trigger已经ready、分配线程等等,同样也需要设置必要的属性信息;

这个过程较长包括:调度工厂、线程池、注册任务等等,整体核心加载流程如下;

微信公众号:bugstack虫洞栈 & 调度工程初始化流程

SchedulerFactoryBean.afterPropertiesSet()

public void afterPropertiesSet() throws Exception {
	if (this.dataSource == null && this.nonTransactionalDataSource != null) {
		this.dataSource = this.nonTransactionalDataSource;
	}
	if (this.applicationContext != null && this.resourceLoader == null) {
		this.resourceLoader = this.applicationContext;
	}
	// Initialize the Scheduler instance...
	this.scheduler = prepareScheduler(prepareSchedulerFactory());
	try {
		registerListeners();
		registerJobsAndTriggers();
	}
	catch (Exception ex) {
		try {
			this.scheduler.shutdown(true);
		}
		catch (Exception ex2) {
			logger.debug("Scheduler shutdown exception after registration failure", ex2);
		}
		throw ex;
	}
}

SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor 是SchedulerFactoryBean的父类

private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
	boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null);
	if (triggerExists && !this.overwriteExistingJobs) {
		return false;
	}
	// Check if the Trigger is aware of an associated JobDetail.
	JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
	if (triggerExists) {
		if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
			this.jobDetails.add(jobDetail);
		}
		try {
			getScheduler().rescheduleJob(trigger.getKey(), trigger);
		}
		catch (ObjectAlreadyExistsException ex) {
			if (logger.isDebugEnabled()) {
				logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
						"cluster race condition: " + ex.getMessage() + " - can safely be ignored");
			}
		}
	}
	else {
		try {
			if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&
					(this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
				getScheduler().scheduleJob(jobDetail, trigger);
				this.jobDetails.add(jobDetail);
			}
			else {
				getScheduler().scheduleJob(trigger);
			}
		}
		catch (ObjectAlreadyExistsException ex) {
			if (logger.isDebugEnabled()) {
				logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
						"cluster race condition: " + ex.getMessage() + " - can safely be ignored");
			}
			if (this.overwriteExistingJobs) {
				getScheduler().rescheduleJob(trigger.getKey(), trigger);
			}
		}
	}
	return true;
}

5. 启动定时任务

案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。核心流程如下;

微信公众号:bugstack虫洞栈 & 调度启动流程

这个启动过程中,核心的代码类,如下;

QuartzScheduler.start() & 启动

public void start() throws SchedulerException {

    if (shuttingDown|| closed) {
        throw new SchedulerException(
                "The Scheduler cannot be restarted after shutdown() has been called.");
    }
	
    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    notifySchedulerListenersStarting();
    
	if (initialStart == null) {
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();            
        startPlugins();
    } else {
        resources.getJobStore().schedulerResumed();
    }
	
    // 唤醒线程
	schedThread.togglePause(false);
	
    getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");
    
    notifySchedulerListenersStarted();
}

QuartzSchedulerThread.run() & 执行过程

@Override
public void run() {
    int acquiresFailed = 0;
	
	// 只有调用了halt()方法,才会退出这个死循环
    while (!halted.get()) {
        try {
			
			// 一、如果是暂停状态,则循环超时等待1000毫秒

            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
           
		    // 阻塞直到有空闲的线程可用并返回可用的数量
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) {
			
                List<OperableTrigger> triggers;
                long now = System.currentTimeMillis();
                clearSignaledSchedulingChange();
                
				try {
					// 二、获取acquire状态的Trigger列表,也就是即将执行的任务
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
                    acquiresFailed = 0;
                    if (log.isDebugEnabled())
                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
                } catch(){//...}
				
                if (triggers != null && !triggers.isEmpty()) {
                    
					// 三:获取List第一个Trigger的下次触发时刻
					long triggerTime = triggers.get(0).getNextFireTime().getTime();
                    
					// 四:获取任务触发集合
					List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
					
					// 五:设置Triggers为'executing'状态
					qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                    
					// 六:创建JobRunShell
					qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
					
					// 七:执行Job
					qsRsrcs.getThreadPool().runInThread(shell)
					
                    continue; // while (!halted)
                }
            } else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows con
                continue; // while (!halted)
            }
			
            
        } catch(RuntimeException re) {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
    }
    
    qs = null;
    qsRsrcs = null;
}

JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行

public void run() {
    qs.addInternalSchedulerListener(this);

    try {
        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
        JobDetail jobDetail = jec.getJobDetail();

        do {
            // ...

            long startTime = System.currentTimeMillis();
            long endTime = startTime;

            // execute the job
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                
				// 执行业务代码,也就是我们的task
				job.execute(jec);
                
				endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                endTime = System.currentTimeMillis();
                jobExEx = jee;
                getLog().info("Job " + jobDetail.getKey() +
                        " threw a JobExecutionException: ", jobExEx);
            } catch (Throwable e) {
                endTime = System.currentTimeMillis();
                getLog().error("Job " + jobDetail.getKey() +
                        " threw an unhandled Exception: ", e);
                SchedulerException se = new SchedulerException(
                        "Job threw an unhandled exception.", e);
                qs.notifySchedulerListenersError("Job ("
                        + jec.getJobDetail().getKey()
                        + " threw an exception.", se);
                jobExEx = new JobExecutionException(se, false);
            }

            jec.setJobRunTime(endTime - startTime);

            // 其他代码
        } while (true);

    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}

QuartzJobBean.execte() & 继续往下走

public final void execute(JobExecutionContext context) throws JobExecutionException {
	try {
		BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
		MutablePropertyValues pvs = new MutablePropertyValues();
		pvs.addPropertyValues(context.getScheduler().getContext());
		pvs.addPropertyValues(context.getMergedJobDataMap());
		bw.setPropertyValues(pvs, true);
	}
	catch (SchedulerException ex) {
		throw new JobExecutionException(ex);
	}
	executeInternal(context);
}

MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
	try {
		// 反射执行业务代码
		context.setResult(this.methodInvoker.invoke());
	}
	catch (InvocationTargetException ex) {
		if (ex.getTargetException() instanceof JobExecutionException) {
			// -> JobExecutionException, to be logged at info level by Quartz
			throw (JobExecutionException) ex.getTargetException();
		}
		else {
			// -> "unhandled exception", to be logged at error level by Quartz
			throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
		}
	}
	catch (Exception ex) {
		// -> "unhandled exception", to be logged at error level by Quartz
		throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
	}
}

五、综上总结


微信公众号:bugstack虫洞栈 & 欢迎关注并获取源码

标签:Quartz,Spring,源码,new,org,null,quartz,jobDetail
来源: https://blog.51cto.com/u_14943622/2844498