其他分享
首页 > 其他分享> > SpringBoot自定义starter开发分布式任务调度实践

SpringBoot自定义starter开发分布式任务调度实践

作者:互联网

概述

需求

在前面的博客《Java定时器演进过程和生产级分布式任务调度ElasticJob代码实战》中,我们已经熟悉ElasticJob分布式任务的应用,其核心实现为elasticjob-lite-spring-boot-starter,少量配置开箱即用;还有前面也有博客文档谈谈走进Spring Boot源码学习之路和浅谈入门,了解Spring Boot的原理,没看过伙伴可以先翻看下前面的文章。SpringBoot官网已经提供非常多的starter使用,然而今天我们就来模拟封装一个简易的分布式任务调度实现定时任务选主执行和故障自动转移的starter,本篇主要重心在于基于SpringBoot官网标准start封装的模板和步骤。

相关概念

制作starter基本步骤

SpringBoot启动简述

Spring Boot 在启动的时候会做这几件事情

其实也就是 Spring Boot 在启动的时候,按照约定去读取 Spring Boot Starter 的配置信息,再根据配置信息对资源进行初始化,并注入到 Spring 容器中。这样 Spring Boot 启动完毕后,就已经准备好了一切资源,使用过程中直接注入对应 Bean 资源即可。

实践

创建项目

image-20220707103058207

image-20220707104422700

autoconfigure实现

参考GitHub基于分布式任务实现的一些代码,这里核心主要是构建一个light-job自动装配配置文件读取类和一个light-job自动装配配置类。

light-job-spring-boot-starter-autoconfigure模块添加Pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.itxs</groupId>
    <artifactId>light-job-spring-boot-starter-autoconfigure</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <name>light-job-spring-boot-starter-autoconfigure</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <jdk.version>1.8</jdk.version>
        <spring-boot.version>2.6.4</spring-boot.version>
        <zookeeper.version>3.4.6</zookeeper.version>
        <commons-lang3.version>3.4</commons-lang3.version>
        <quartz.version>2.3.2</quartz.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>${quartz.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>${quartz.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

创建LightJobProperties读取配置文件

package com.itxs.lightjob.config;

import com.itxs.lightjob.zk.ZKManager.KEYS;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@ConfigurationProperties(prefix = "light.job",ignoreInvalidFields = true)
public class LightJobProperties {

	private String enabled;
	private String zkConnect;
	private String rootPath = "/light/job";
	private int zkSessionTimeout = 60000;
	private String zkUsername;
	private String zkPassword;
	private List<String> ipBlackList;
	
	
	private List<String> targetBean;
	private List<String> targetMethod;
	private List<String> cronExpression;
	private List<String> startTime;
	private List<String> period;
	private List<String> delay;
	private List<String> params;
	private List<String> type;
	private List<String> extKeySuffix;
	private List<String> beforeMethod;
	private List<String> afterMethod;
	private List<String> threadNum;
	
	
	public Map<String, String> getConfig(){
		Map<String, String> properties = new HashMap<String, String>();
		properties.put(KEYS.zkConnectString.key, zkConnect);
		if(StringUtils.isNotBlank(rootPath)){
			properties.put(KEYS.rootPath.key, rootPath);
		}
		if(zkSessionTimeout > 0){
			properties.put(KEYS.zkSessionTimeout.key, zkSessionTimeout+"");
		}
		if(StringUtils.isNotBlank(zkUsername)){
			properties.put(KEYS.userName.key, zkUsername);
		}
		if(StringUtils.isNotBlank(zkPassword)){
			properties.put(KEYS.password.key, zkPassword);
		}
		StringBuilder sb = new StringBuilder();
		if(ipBlackList != null && ipBlackList.size() > 0){
			for(String ip:ipBlackList){
				sb.append(ip).append(",");
			}
			sb.substring(0,sb.lastIndexOf(","));
		}
		properties.put(KEYS.ipBlacklist.key, sb.toString());
		return properties;
	}

	public String getEnabled() {
		return enabled;
	}

	public void setEnabled(String enabled) {
		this.enabled = enabled;
	}

	public String getZkConnect() {
		return zkConnect;
	}
	public void setZkConnect(String zkConnect) {
		this.zkConnect = zkConnect;
	}
	public String getRootPath() {
		return rootPath;
	}
	public void setRootPath(String rootPath) {
		this.rootPath = rootPath;
	}
	public int getZkSessionTimeout() {
		return zkSessionTimeout;
	}
	public void setZkSessionTimeout(int zkSessionTimeout) {
		this.zkSessionTimeout = zkSessionTimeout;
	}
	public String getZkUsername() {
		return zkUsername;
	}
	public void setZkUsername(String zkUsername) {
		this.zkUsername = zkUsername;
	}
	public String getZkPassword() {
		return zkPassword;
	}
	public void setZkPassword(String zkPassword) {
		this.zkPassword = zkPassword;
	}
	public List<String> getIpBlackList() {
		return ipBlackList;
	}
	public void setIpBlackList(List<String> ipBlackList) {
		this.ipBlackList = ipBlackList;
	}


	public List<String> getTargetBean() {
		return targetBean;
	}


	public void setTargetBean(List<String> targetBean) {
		this.targetBean = targetBean;
	}


	public List<String> getTargetMethod() {
		return targetMethod;
	}


	public void setTargetMethod(List<String> targetMethod) {
		this.targetMethod = targetMethod;
	}


	public List<String> getCronExpression() {
		return cronExpression;
	}


	public void setCronExpression(List<String> cronExpression) {
		this.cronExpression = cronExpression;
	}


	public List<String> getStartTime() {
		return startTime;
	}


	public void setStartTime(List<String> startTime) {
		this.startTime = startTime;
	}


	public List<String> getPeriod() {
		return period;
	}


	public void setPeriod(List<String> period) {
		this.period = period;
	}


	public List<String> getDelay() {
		return delay;
	}


	public void setDelay(List<String> delay) {
		this.delay = delay;
	}


	public List<String> getParams() {
		return params;
	}


	public void setParams(List<String> params) {
		this.params = params;
	}


	public List<String> getType() {
		return type;
	}


	public void setType(List<String> type) {
		this.type = type;
	}


	public List<String> getExtKeySuffix() {
		return extKeySuffix;
	}


	public void setExtKeySuffix(List<String> extKeySuffix) {
		this.extKeySuffix = extKeySuffix;
	}


	public List<String> getBeforeMethod() {
		return beforeMethod;
	}


	public void setBeforeMethod(List<String> beforeMethod) {
		this.beforeMethod = beforeMethod;
	}


	public List<String> getAfterMethod() {
		return afterMethod;
	}


	public void setAfterMethod(List<String> afterMethod) {
		this.afterMethod = afterMethod;
	}

	public List<String> getThreadNum() {
		return threadNum;
	}


	public void setThreadNum(List<String> threadNum) {
		this.threadNum = threadNum;
	}
}

创建自动装配类LightJobAutoConfiguration.java

package com.itxs.lightjob.config;


import com.itxs.lightjob.ZKScheduleManager;
import com.itxs.lightjob.core.TaskDefine;
import com.itxs.lightjob.util.ScheduleUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Configuration
@EnableConfigurationProperties({LightJobProperties.class})
@ConditionalOnProperty(value = "light.job.enabled", havingValue = "true")
@ComponentScan()
public class LightJobAutoConfiguration {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(LightJobAutoConfiguration.class);
	
	@Autowired
	private LightJobProperties uncodeScheduleConfig;
	
	@Bean(name = "zkScheduleManager", initMethod="init")
	public ZKScheduleManager commonMapper(){
		ZKScheduleManager zkScheduleManager = new ZKScheduleManager();
		zkScheduleManager.setZkConfig(uncodeScheduleConfig.getConfig());
		List<TaskDefine> list = initAllTask();
		zkScheduleManager.setInitTaskDefines(list);
		LOGGER.info("=====>ZKScheduleManager inited..");
		return zkScheduleManager;
	}
	
	private List<TaskDefine> initAllTask(){
		List<TaskDefine> list = new ArrayList<TaskDefine>();
		int total = 0;
		if(uncodeScheduleConfig.getTargetBean() != null){
			total = uncodeScheduleConfig.getTargetBean().size();
		}
		for(int i = 0; i < total; i++){
			TaskDefine taskDefine = new TaskDefine();
			if(uncodeScheduleConfig.getTargetBean() != null){
				 String value = uncodeScheduleConfig.getTargetBean().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setTargetBean(value);
				}
			}
			if(uncodeScheduleConfig.getTargetMethod() != null){
				 String value = uncodeScheduleConfig.getTargetMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setTargetMethod(value);
				}
			}
			if(uncodeScheduleConfig.getCronExpression() != null){
				 String value = uncodeScheduleConfig.getCronExpression().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setCronExpression(value);
				}
			}
			if(uncodeScheduleConfig.getStartTime() != null){
				 String value = uncodeScheduleConfig.getStartTime().get(i);
				if(StringUtils.isNotBlank(value)){
					Date time = null;
					try {
						time = ScheduleUtil.transferStringToDate(value);
					} catch (ParseException e) {
						e.printStackTrace();
					}
					if(time != null){
						taskDefine.setStartTime(time);
					}
				}
			}
			if(uncodeScheduleConfig.getPeriod() != null){
				 String value = uncodeScheduleConfig.getPeriod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setPeriod(Long.valueOf(value));
				}
			}
			if(uncodeScheduleConfig.getDelay() != null){
				 String value = uncodeScheduleConfig.getDelay().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setDelay(Long.valueOf(value));
				}
			}
			
			if(uncodeScheduleConfig.getParams() != null){
				 String value = uncodeScheduleConfig.getParams().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setParams(value);
				}
			}
			
			if(uncodeScheduleConfig.getType() != null){
				 String value = uncodeScheduleConfig.getType().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setType(value);
				}
			}
			
			if(uncodeScheduleConfig.getExtKeySuffix() != null){
				 String value = uncodeScheduleConfig.getExtKeySuffix().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setExtKeySuffix(value);
				}
			}
			if(uncodeScheduleConfig.getBeforeMethod() != null){
				 String value = uncodeScheduleConfig.getBeforeMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setBeforeMethod(value);
				}
			}
			if(uncodeScheduleConfig.getAfterMethod() != null){
				 String value = uncodeScheduleConfig.getAfterMethod().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setAfterMethod(value);
				}
			}
			if(uncodeScheduleConfig.getThreadNum() != null){
				 String value = uncodeScheduleConfig.getThreadNum().get(i);
				if(StringUtils.isNotBlank(value)){
					taskDefine.setThreadNum(Integer.valueOf(value));
				}
			}
			list.add(taskDefine);
		}
		return list;
	}
}

然后在resources目录下的META-INF目录下创建spring.factories文件,跟SpringBoot其他starter一样,输出自动装配类的全类名;springboot项目默认只会扫描本项目下的带@Configuration注解的类,如果自定义starter,不在本工程中,是无法加载的,所以要配置META-INF/spring.factories配置文件。配置了META-INF/spring.factories配置文件是springboot实现starter的关键点,springboot的这种配置加载方式是一种类SPI(Service Provider Interface)的方式,SPI可以在META-INF/services配置接口扩展的实现类,springboot中原理类似,只是名称换成了spring.factories而已。

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itxs.lightjob.config.LightJobAutoConfiguration

其他还有自动装配类的具体实现代码文件,如下面目录,主要利用zookeeper做分布式协调如分布式选主,执行maven install打包和安装到本地maven仓库。

image-20220707142623981

light-job-spring-boot-starter

light-job-spring-boot-starter是不做实现,主要管理依赖,Pom文件内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itxs</groupId>
    <artifactId>light-job-spring-boot-starter</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.itxs</groupId>
            <artifactId>light-job-spring-boot-starter-autoconfigure</artifactId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

最后我们执行maven install打包和安装到本地maven仓库。

调用示例

示例工程中加入light-job-spring-boot-starter依赖,这里选择前面文章示例的库存微服务模块中添加

        <dependency>
            <groupId>com.itxs</groupId>
            <artifactId>light-job-spring-boot-starter</artifactId>
            <version>1.0</version>
        </dependency>

创建演示任务并放到Spring容器里管理

package cn.itxs.ecom.storage.job;

import org.springframework.stereotype.Component;

@Component
public class DemoTask {

    public void execute() {
        System.out.println("===========execute start!=========");
        System.out.println("===========do job!=========");
        System.out.println("===========execute end !=========");
    }
}

配置文件增加

light:
  job:
    enabled: true
    zk-connect: 192.168.4.27:2181,192.168.4.28:2181,192.168.4.29:2181
    root-path: /ecom/storage
    zk-session-timeout: 60000
    target-bean:
      - demoTask
    target-method:
      - execute
    period:
      - 1000
    cron-expression:
      - 0/10 * * * * ?

启动三个库存微服务模块,在第1个库存微服务模块看到demoTask任务已经根据配置每十秒在运行

image-20220707145207648

关闭第1个库存微服务模块程序后,通过zookeeper重新选举一个节点定时执行,从下面看选择第3个库存微服务模块每十秒实行

image-20220707145317534

Redis读取配置赋值lightjob

zookeeper地址配置可以放到配置中心如Nacos,如果目前我们配置数据是放在Redis中,可以通过System.setProperty设置系统变量的方式来实现,先注释zk-connect的配置,这是启动程序就会报错

image-20220707150301012

RedisConfig配置类中增加实现BeanPostProcessor接口实现其postProcessAfterInitialization方法,在bean初始化后读取redis值设置环境变量值。

package cn.itxs.ecom.storage.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@Slf4j
public class RedisConfig implements BeanPostProcessor{
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance , ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        //template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setValueSerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        //template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(stringRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException
    {
        //在redisTemplate Bean初始化之后设置light.job.zk-connect为公共集群的zk地址
        if (beanName.equals("redisTemplate")){
            log.info("postProcessAfterInitialization match beanName {}",beanName);
            try {
                RedisTemplate redisObj = (RedisTemplate) bean;
                String zkConnect = (String)redisObj.opsForHash().get("clusterinfo", "zookeeper-server");
                if (StringUtils.isNotBlank(zkConnect)) {
                    log.info("postProcessAfterInitialization get zkConnect ={}", zkConnect);
                    System.setProperty("light.job.zk-connect", zkConnect);
                    log.info("System.setProperty light.job.zk-connect={}", zkConnect);
                }
            } catch (Exception e) {
                log.error("postProcessAfterInitialization operate redisTemplate {} failed", e);
            }
        }
        return null;
    }
}

启动后可以看到正常每十秒执行定时任务

**本人博客网站 **IT小神 www.itxiaoshen.com

标签:SpringBoot,自定义,List,value,org,import,任务调度,public,String
来源: https://www.cnblogs.com/itxiaoshen/p/16456694.html