quartz定时任务集群版
作者:互联网
开篇说明
- 如果在这里获得过启发和思考,希望点赞支持!对于内容有不同的看法欢迎来信交流。
- 技术栈 >> java
- 邮箱 >> 15673219519@163.com
描述
- 之前项目刚刚开始简易的通过实现单机版。 quartz定时任务,两张表实现数据持久化
- 由于项目持续了大半年的迭代更新,需要执行的定时任务增多。并且服务中还有其他业务,单机版本的定时任务影响了服务的集群搭建。所以,着手对其改进。
- 目标是保留单机版中两张表的使用,来对定时任务执行计划在页面控制,以及可查看每个任务的执行结果等信息。
我的思路
- 通过quartz官方提供的11张数据表作为数据存储,来实现集群服务的数据共享。
- 自定义表
schedule_job
作为任务的初始化,以及后续对执行计划的修改,自定义表schedule_log
作为执行记录的存储。此处与单机版一致。 - 定时任务监听器ScheduleJobListener.java记录任务的执行结果。
- 注意:与单机版不同的是,集群版的数据存储不在内存中而是在官方提供的11各表中。由于将启动多个服务,故,在初始化任务时需要判断该定时任务是否已经存在。(单机时内存中的数据每次停机均会清空,而集群中数据库中的数据却会一值保留,除非重新创建表)。因此我们需要修改ScheduleJobUtil工具类,以及初始化时候的逻辑。
- 此外我们还需要对添加一些数据源的配置。
第一步:添加依赖
<!--quartz依赖-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.3</version>
</dependency>
第二步:创建数据表
- 自定义的两个表,参考开篇中的单机版博客
- 11个表均由官方提供
-- 定时任务所需的数据表
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;
CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;
CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;
CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
commit;
第三步:java代码实现
- 添加配置 quartz.properties
#quartz集群配置
#调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=DefaultQuartzScheduler
#ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon=true
#线程池的实现类(一般使用SimpleThreadPool即可满足需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#指定在线程池里面创建的线程是否是守护线程
org.quartz.threadPool.makeThreadsDaemons=true
#指定线程数,至少为1(无默认值)
org.quartz.threadPool.threadCount:20
#设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority:5
#数据保存方式为数据库持久化
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix=QRTZ_
#是否加入集群
org.quartz.jobStore.isClustered=true
# 信息保存时间 默认值60秒
org.quartz.jobStore.misfireThreshold=25000
- ScheduleJobListener.java 定时任务监听器:通过监听的方式记录 记录定时任务的执行记录。
import com.qykj.admin.service.schedule.JobService;
import com.qykj.core.constants.schedule.JobLogStatusEnum;
import com.qykj.core.util.DateUtil;
import com.qykj.core.util.SpringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* =====================================================================================================================
* jiangshaoneng <15673219519.@163.com> 2021/11/20 14:18
*
* 定时任务监听器:通过监听的方式记录 记录定时任务的执行记录。
* =====================================================================================================================
*/
@Component
public class ScheduleJobListener implements JobListener{
private JobService jobService;
// 用于保存 logId
private ThreadLocal<String> threadLocalLogId = new ThreadLocal<>();
// startTime
private ThreadLocal<Long> threadLocalStartTime = new ThreadLocal<>();
@Override
public String getName() {
return "myJobListener";
}
@Override
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
ThreadContext.put("traceID", UUID.randomUUID().toString().replace("-",""));
if(jobService == null){
jobService = SpringUtils.getBean(JobService.class);
}
JobDetail jobDetail = jobExecutionContext.getJobDetail();
String name = jobDetail.getKey().getName();
String logId = jobService.saveScheduleLog(name, JobLogStatusEnum.RUNNING.getCode(), "");
threadLocalLogId.set(logId); // 把执行记录放到 threadLocal,提供给执行结束后取此结果
threadLocalStartTime.set(DateUtil.getCurrentTime());
}
@Override
public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {
String logId = threadLocalLogId.get(); // 执行记录logId
long runTime = DateUtil.getCurrentTime() - threadLocalStartTime.get();
jobService.updateScheduleLog(logId, JobLogStatusEnum.ERROR.getCode(), "执行失败",runTime);
}
@Override
public void jobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException e) {
String logId = threadLocalLogId.get(); // 执行记录logId
long runTime = DateUtil.getCurrentTime() - threadLocalStartTime.get();
if(e == null){ // 没有异常修改记录为成功
jobService.updateScheduleLog(logId, JobLogStatusEnum.SUCCESS.getCode(), "执行成功",runTime);
}else{ // 存在异常修改记录失败,并且把异常信息保存到数据库中
jobService.updateScheduleLog(logId, JobLogStatusEnum.ERROR.getCode(), e.toString(), runTime);
}
}
}
- ScheduleConfig.java 定时任务配置类,配置数据源,监听器等信息
import com.qykj.admin.job.ScheduleJobListener;
import com.qykj.admin.job.ScheduleJobUtil;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;
import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals;
/**
* =====================================================================================================================
* jiangshaoneng <15673219519.@163.com> 22021/11/20 14:18
*
* 定时任务配置类。
* =====================================================================================================================
*/
@Configuration
public class SchedulerConfig {
@Qualifier("writeDataSource")
@Autowired
private DataSource dataSource;
@Bean
public Scheduler scheduler() throws IOException{
Scheduler scheduler = schedulerFactoryBean().getScheduler();
try {
// 默认将 MyJobListener 绑定到 JOB_GROUP_NAME,在MyJobListener通过监听的方式记录 记录定时任务的执行记录
scheduler.getListenerManager()
.addJobListener(new ScheduleJobListener(), jobGroupEquals(ScheduleJobUtil.JOB_GROUP_NAME));
} catch (SchedulerException e) {
e.printStackTrace();
}
return scheduler;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException{
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setSchedulerName("cluster_scheduler");
factory.setDataSource(dataSource);
factory.setApplicationContextSchedulerContextKey("application");
factory.setQuartzProperties(quartzProperties());
factory.setTaskExecutor(schedulerThreadPool());
factory.setStartupDelay(0);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
@Bean
public Executor schedulerThreadPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
return executor;
}
}
- ScheduleJobUtils.java 定时任务工具类,包括任务的增删改查,已配置数据库的情况,因此所有操作都是针对数据库中的操作。(jar包已经集成,只需调用对应方法即可,无需我们实现数据库操作)
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* =====================================================================================================================
* jiangshaoneng <15673219519.@163.com> 2021/11/20 14:18
*
* 任务调度的工具类,包括定时任务的新增修改删除等。本系统暂时对任务组,触发器组名暂时均使用同一个。
* 如:JOB_GROUP_NAME,TRIGGER_GROUP_NAME
* =====================================================================================================================
*/
@Slf4j
@Component
public class ScheduleJobUtil {
@Autowired
private Scheduler scheduler;
public static String JOB_GROUP_NAME = "DEFAULT_JOB_GROUP";
private static String TRIGGER_GROUP_NAME = "DEFAULT_TRIGGER_GROUP";
public ScheduleJobUtil(){
}
/**
* @Description: 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名
* @param jobName 任务名
* @param cls 任务
* @param cron 时间设置,参考quartz说明文档
* @throws SchedulerException
*/
public void addJob(String jobName, Class cls, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TRIGGER_GROUP_NAME);
Trigger trigger = scheduler.getTrigger(triggerKey);
if(trigger != null){
log.info("添加任务:{},{},{} 已存在", jobName, cls, cron);
return;
}
// 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例
JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(jobName, JOB_GROUP_NAME).build();
// 构建一个触发器,规定触发的规则
trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器
.withIdentity(jobName, TRIGGER_GROUP_NAME)// 给触发器起一个名字和组名
.startNow()// 立即执行
.withSchedule(CronScheduleBuilder.cronSchedule(cron)) // 触发器的执行时间
.usingJobData(dataMap) // 定时器一些简单的数据
.build();// 产生触发器
scheduler.scheduleJob(jobDetail, trigger);
log.info("添加任务:{},{},{}", jobName, cls, cron);
// 启动
if (!scheduler.isShutdown()) {
scheduler.start();
}
}
/**
* @Description: 添加一个定时任务
*
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param cls 任务
* @param cron 时间设置,参考quartz说明文档
*/
public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class cls, String cron) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TRIGGER_GROUP_NAME);
Trigger trigger = scheduler.getTrigger(triggerKey);
if(trigger != null){
log.info("添加任务:{},{},{},{},{},{} 已存在",jobName,jobGroupName,triggerName,triggerGroupName,cls,cron);
return;
}
// 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例
JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(jobName, jobGroupName).build();
// 构建一个触发器,规定触发的规则
trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器
.withIdentity(jobName, triggerGroupName)// 给触发器起一个名字和组名
.startNow()// 立即执行
.withSchedule(CronScheduleBuilder.cronSchedule(cron)) // 触发器的执行时间
.build();// 产生触发器
scheduler.scheduleJob(jobDetail, trigger);
log.info("添加任务:{},{},{},{},{},{}",jobName,jobGroupName,triggerName,triggerGroupName,cls,cron);
// 启动
if (!scheduler.isShutdown()) {
scheduler.start();
}
}
/**
* @Description: 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名)
*
* @param jobName
* @param cron
* @throws SchedulerException
*/
public void modifyJobTime(String jobName, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, TRIGGER_GROUP_NAME);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
log.info("修改任务失败,此任务不存在:{},{}", jobName, cron);
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(cron)) {
JobDetail jobDetail = scheduler.getJobDetail(new JobKey(jobName, JOB_GROUP_NAME));
Class objJobClass = jobDetail.getJobClass();
removeJob(jobName);
addJob(jobName, objJobClass, cron, dataMap);
log.info("修改任务:{},{}",jobName,cron);
}
}
/**
* @Description: 修改或添加一个任务,jobName 存在时修改任务,不存在时则新增任务
*
*/
public void modifyOrAddJobTime(String jobName, Class cls, String cron, JobDataMap dataMap) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobName, TRIGGER_GROUP_NAME);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
addJob(jobName, cls, cron, dataMap);
log.info("添加任务:{},{}",jobName,cron);
}else{
String oldTime = trigger.getCronExpression();
JobDataMap oldJobDataMap = trigger.getJobDataMap();
if (!oldTime.equalsIgnoreCase(cron) || !oldJobDataMap.equals(dataMap)) {
JobDetail jobDetail = scheduler.getJobDetail(new JobKey(jobName, JOB_GROUP_NAME));
Class objJobClass = jobDetail.getJobClass();
removeJob(jobName);
addJob(jobName, objJobClass, cron, dataMap);
log.info("修改任务:{},{}",jobName,cron);
}else {
log.info("任务存在:{},{}",jobName,cron);
}
}
}
/**
* @Description: 移除一个任务(使用默认的任务组名,触发器名,触发器组名)
* @param jobName
* @throws SchedulerException
*/
public void removeJob(String jobName) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, TRIGGER_GROUP_NAME);
// 停止触发器
scheduler.pauseJob(jobKey);
scheduler.unscheduleJob(new TriggerKey(jobName, TRIGGER_GROUP_NAME));// 移除触发器
scheduler.deleteJob(jobKey);// 删除任务
log.info("移除任务:{}",jobName);
}
/**
* 移除任务
*
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
* @throws SchedulerException
*/
public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) throws SchedulerException {
JobKey jobKey = new JobKey(jobName, jobGroupName);
// 停止触发器
scheduler.pauseJob(jobKey);
scheduler.unscheduleJob(new TriggerKey(jobName, triggerGroupName));// 移除触发器
scheduler.deleteJob(jobKey);// 删除任务
log.info("移除任务:{},{},{},{}",jobName,jobGroupName,triggerName,triggerGroupName);
}
/**
* 启动所有任务
* @throws SchedulerException
*/
public void startJobs() throws SchedulerException {
scheduler.start();
log.info("启动所有任务");
}
/**
* 关闭所有定时任务
* @throws SchedulerException
*/
public void shutdownJobs() throws SchedulerException {
if (!scheduler.isShutdown()) {
scheduler.shutdown();
log.info("关闭所有任务");
}
}
}
- ScheduleJobInitListener.java 监听项目启动时初始化定时任务
import com.qykj.admin.service.schedule.JobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
* =====================================================================================================================
* jiangshaoneng <15673219519.@163.com> 2021/11/20 14:18
*
* 启动项目时,启动数据库配置启动的定时任务。可支持集权部署
* =====================================================================================================================
*/
@Slf4j
@Component
public class ScheduleJobInitListener implements ApplicationListener<ContextRefreshedEvent>{
@Autowired
private JobService jobService;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
log.info("开始初始化定时任务 ...");
jobService.initScheduleJob();
log.info("初始化定时任务成功 ...");
}
}
- ScheduleService.java 定时任务初始化,修改等逻辑
import com.qykj.admin.job.ScheduleJobUtil;
import com.qykj.core.domain.entity.schedule.ScheduleJob;
import com.qykj.core.domain.entity.schedule.ScheduleLog;
import com.qykj.core.exception.AppException;
import com.qykj.core.exception.ErrorCode;
import com.qykj.core.util.JsonUtils;
import com.qykj.repo.impl.schedule.ScheduleJobRepoImpl;
import com.qykj.repo.impl.schedule.ScheduleLogRepoImpl;
import org.quartz.JobDataMap;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class JobService {
private static final Logger logger = LoggerFactory.getLogger(JobService.class);
@Autowired
private ScheduleJobRepoImpl scheduleJobRepo;
@Autowired
private ScheduleLogRepoImpl scheduleLogRepo;
@Autowired
private ScheduleJobUtil scheduleJobUtil;
/**
* 查看所有任务列表
*/
public List<ScheduleJob> getAllJob(){
return scheduleJobRepo.getList();
}
/**
* 查看某个任务的执行详情
*/
public List<ScheduleLog> getJobScheduleDetail(int jobId){
return scheduleLogRepo.getListByJobId(jobId);
}
/**
* 查看某个任务的详情
*/
public ScheduleJob getJobDetail(int jobId){
return scheduleJobRepo.getScheduleJobById(jobId);
}
/**
* 修改任务
*/
public Map<String,Object> updateJob(ScheduleJob scheduleJob){
Map<String,Object> result = new HashMap<>();
try {
Class<?> clszz = Class.forName(scheduleJob.getClazz());
Integer status = scheduleJob.getStatus();
if(status == 1){ // 修改或者添加任务
Map<String, String> map = (Map<String, String>) JsonUtils.json2Map(scheduleJob.getJobDataJson());
JobDataMap dataMap = new JobDataMap(map); // 配置定时器需要的数据
scheduleJobUtil.modifyOrAddJobTime(scheduleJob.getName(), clszz, scheduleJob.getCron(), dataMap);
}else if(status == 2){ // 停止任务
scheduleJobUtil.removeJob(scheduleJob.getName());
}else{
return result;
}
}catch (SchedulerException e){
logger.error("修改定时任务异常:{}", e.toString());
throw new AppException(ErrorCode.SYS_ERROR);
}catch (ClassNotFoundException e){
logger.error("修改定时任务异常,无法找到指定的任务类");
throw new AppException(ErrorCode.SYS_PARAMS_ERROR.code(), "无法找到指定的任务类");
}
scheduleJobRepo.updateScheduleJob(scheduleJob);
return result;
}
/**
* 启动数据库中配置的定时任务
*/
public void initScheduleJob(){
// 移除不需要执行的定时任务
List<ScheduleJob> disableList = scheduleJobRepo.getDisableList();
for (ScheduleJob scheduleJob: disableList){
try {
scheduleJobUtil.removeJob(scheduleJob.getName());
} catch (SchedulerException e) {
logger.error("移除定时任务:{}异常:{}", scheduleJob.getName(), e.toString());
}
}
// 添加需要执行的定时任务
List<ScheduleJob> enableList = scheduleJobRepo.getEnableList();
for (ScheduleJob scheduleJob: enableList){
Map<String, String> map = new HashMap<>();
try {
map = (Map<String, String>) JsonUtils.json2Map(scheduleJob.getJobDataJson());
}catch (Exception e){
logger.error("定时任务:{},DataJson格式不合法", scheduleJob.getName());
}
try {
JobDataMap dataMap = new JobDataMap(map); // 配置定时器需要的数据
Class<?> clszz = Class.forName(scheduleJob.getClazz());
scheduleJobUtil.modifyOrAddJobTime(scheduleJob.getName(), clszz, scheduleJob.getCron(), dataMap);
} catch (SchedulerException e){
logger.error("初始化启动定时任务:{},异常:{}", scheduleJob.getName(), e.toString());
continue;
} catch (ClassNotFoundException e){
logger.error("初始化启动定时任务异常,无法找到指定的任务类");
continue;
}
}
}
/**
* 新增一条执行日志
*/
public String saveScheduleLog(String name, int status, String logInfo){
ScheduleJob scheduleJob = scheduleJobRepo.getScheduleJobByName(name);
ScheduleLog scheduleLog = new ScheduleLog(scheduleJob,status,logInfo);
scheduleLogRepo.saveScheduleLog(scheduleLog);
return scheduleLog.getId();
}
/**
* 更新一条执行日志
*/
public void updateScheduleLog(String id, int status, String logInfo, long runTime){
scheduleLogRepo.updateScheduleLog(id,status,logInfo,runTime);
}
}
- ScheduleController.java 提供页面操作的接口
import com.qykj.admin.aspect.AuthPermissions;
import com.qykj.admin.service.schedule.JobService;
import com.qykj.core.domain.entity.schedule.ScheduleJob;
import com.qykj.core.domain.entity.schedule.ScheduleLog;
import com.qykj.core.view.MSG;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/v2/schedule/job")
@Api(tags = "Schedule:001-定时任务控制管理")
public class JobControllerV2 {
@Autowired
private JobService jobService;
@ApiOperation(value = "查看所有任务列表")
@GetMapping("/list")
public MSG<List<ScheduleJob>> getJobList(){
List<ScheduleJob> allJob = jobService.getAllJob();
return MSG.SUCCESS(allJob);
}
@ApiOperation(value = "查看某个任务的执行详情")
@GetMapping("/schedule/detail")
public MSG<List<ScheduleLog>> getJobScheduleDetail(@RequestParam("jobId") int jobId){
List<ScheduleLog> jobScheduleDetail = jobService.getJobScheduleDetail(jobId);
return MSG.SUCCESS(jobScheduleDetail);
}
@ApiOperation(value = "查看某个任务的详情")
@GetMapping("/detail")
public MSG<ScheduleJob> getJobDetail(@RequestParam("jobId") int jobId){
ScheduleJob jobDetail = jobService.getJobDetail(jobId);
return MSG.SUCCESS(jobDetail);
}
@ApiOperation(value = "修改任务")
@PostMapping("/update")
public MSG updateJob(@RequestBody ScheduleJob job){
jobService.updateJob(job);
return MSG.SUCCESS();
}
}
标签:quartz,GROUP,NAME,TRIGGER,集群,QRTZ,import,定时,NULL 来源: https://blog.csdn.net/qq_39529562/article/details/121531851