调度中心整体方案【基于xxl-job的内部定制改造】
作者:互联网
文章目录
- 调度中心整体方案
调度中心整体方案
xxl-job改造
目标
我们的调度中心,一般是建立在spring环境。xxl-job的原生支持方案是需要加@XxlJob(“xxx”)注解。然而,我们目前自研的调度中心,是支持通过配置bean和bean的method(method可以加参数,但只能是String类型)来调度。配置图如下:
所以,我们尝试改造xxl-job,增加一种调度类型,使其可以直接调度bean的method,而不需要加@XxlJob(“xxx”)注解。整体实现的效果如下图:
admin页面&db修改
如上图,在任务创建和更新的地方,要增加
-
db的xxl_job_info,增加字段:
handler_method varchar(255) DEFAULT '' COMMENT '调用的bean方法'
-
任务配置->运行模式,增加一种”EXEC_LLT“类型:com.xxl.job.core.glue.GlueTypeEnum
-
如果选择了”EXEC_LLT“类型,要显示”bean方法“输入框,来输入方法
上面代码的改动,可以clone代码来查看:https://gitee.com/kelvin11/xxl-job
重点修改的代码
admin增加对”EXEC_LLT“类型的调度支持
这里主要可以从调度的入口来找在哪里修改,可以通过页面点击”执行一次“,来查看是触发的后端admin的哪个接口(answer:/jobinfo/trigger)
com.xxl.job.admin.controller.JobInfoController#triggerJob
com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger
com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger
com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger
com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor
com.xxl.job.core.biz.client.ExecutorBizClient#run
XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);
一路下来,可以看到,其实最终admin,是通过一个http的调用,来调起executor的执行。这个调用的参数样例,如下:
http://localhost:9999/run
{
"jobId": 1,
"executorHandler": "sampleLltJob",
"handlerMethod": "execLltSample",-------这个是这次改造新增的参数
"executorParams": "hello parameters~",
"executorBlockStrategy": "SERIAL_EXECUTION",
"executorTimeout": 0,
"logId": 11,
"logDateTime": 1637041633591,
"glueType": "EXEC-LLT",
"glueSource": "",
"glueUpdatetime": 1541254891000,
"broadcastIndex": 0,
"broadcastTotal": 1
}
那么修改的核心,应该是在executor,收到请求后,如何去执行。
executor增加对”EXEC_LLT“类型的调度支持
那就找executor是如何接收请求的?我们找/run
这样的关键字,找到在这里:com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0,明显可以看出,这个是netty创建的一个server,默认就是配置文件配置的端口9999。
为了改造最小化,我们还是通过/run方法来接收请求,新增处理方式即可,这里就是改造的重点了。
com.xxl.job.core.biz.impl.ExecutorBizImpl#run
原先是要拿到一个IJobHandler,我们可以看到这个实现类,原先是3个,我们针对”EXEC_LLT“类型,新增加了一个实现类。
我们通过反射根据TriggerParam,找到对应的被调用类和方法,然后生成LltMethodJobHandler对象,核心代码如下:
// new jobhandler
Object target = XxlJobSpringExecutor.getApplicationContext().getBean(triggerParam.getExecutorHandler());
Method method = null;
try {
if (!StringUtils.hasText(triggerParam.getExecutorParams())) {
method = target.getClass().getMethod(triggerParam.getHandlerMethod());
}
else {
method = target.getClass().getMethod(triggerParam.getHandlerMethod(), String.class);
}
} catch (NoSuchMethodException e) {
logger.error("handler no such method", e);
}
String paramString = triggerParam.getExecutorParams();
IJobHandler newJobHandler = new LltMethodJobHandler(target, method, null, null, paramString);
我们新增的com.xxl.job.core.handler.impl.LltMethodJobHandler代码不复杂,跟原先基本一样,就是增加了反射调用,而不是通过原先注解找到的方法来执行。
package com.xxl.job.core.handler.impl;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
/**
* @author liukun
*/
public class LltMethodJobHandler extends IJobHandler {
private final Object target;
private final Method method;
private Method initMethod;
private Method destroyMethod;
private String paramString;
public LltMethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod, String paramString) {
this.target = target;
this.method = method;
this.initMethod = initMethod;
this.destroyMethod = destroyMethod;
this.paramString = paramString;
}
@Override
public void execute() throws Exception {
if (!StringUtils.hasText(paramString)) {
method.invoke(target);
}
else {
method.invoke(target, paramString);
}
/*Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}*/
}
@Override
public void init() throws Exception {
if(initMethod != null) {
initMethod.invoke(target);
}
}
@Override
public void destroy() throws Exception {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}
@Override
public String toString() {
return super.toString()+"["+ target.getClass() + "#" + method.getName() +"]";
}
}
至此,我们的admin和executor的改造就完成了。下面就是将executor打成jar包放入到项目中运行看看。
我们由于使用的docker环境,并且是使用的swam overlay网络,这里就有个疑问:executor获取的ip是什么ip?admin调用executor的时候还能正常调用吗?
部署
executor打jar包
这个比较简单,直接将xxl-job打包,然后再xxl-job-core的target目录下找到对应的jar包
executor的jar包上传到nexus私服
在dbc项目中引入
maven中引入executor
<dependency>
<groupId>com.xuxueli.hbtech</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
增加xxl-job的properties配置
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://172.18.102.116:8080/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=
### xxl-job executor appname
xxl.job.executor.appname=dbc-test
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=./data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
logging.config=classpath:logback.xml
增加xxl-job的配置
package com.hbtech.dbc.common.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author liukun 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
dbc项目打成jar包
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
创建docker swarm overlay环境
我们有2台虚拟机来测试,分别是172.18.102.116和172.18.102.134。
-
防踩坑
为了防止踩坑,先都打开防火墙,并且开放一些必要的端口(2台机器都要执行)
systemctl restart firewalld.service firewall-cmd --zone=public --add-port=7946/tcp --permanent firewall-cmd --zone=public --add-port=7946/udp --permanent firewall-cmd --zone=public --add-port=4789/udp --permanent firewall-cmd --zone=public --add-port=4789/tcp --permanent firewall-cmd --zone=public --add-port=2375/tcp --permanent firewall-cmd --zone=public --add-port=2375/udp --permanent firewall-cmd --zone=public --add-port=2377/udp --permanent firewall-cmd --zone=public --add-port=2377/tcp --permanent firewall-cmd --zone=public --add-port=8080/tcp --permanent firewall-cmd --zone=public --add-port=9999/tcp --permanent firewall-cmd --reload
如果不执行,可能会遇到一些奇怪的问题,比如:
-
创建了overlay网络,在work机器上,容器应用不能加入网络
[root@server3 ~]# docker run -itd --name=busybox2 --network=testnetwork busybox /bin/sh Unable to find image 'busybox:latest' locally latest: Pulling from library/busybox e685c5c858e3: Pull complete Digest: sha256:e7157b6d7ebbe2cce5eaa8cfe8aa4fa82d173999b9f90a9ec42e57323546c353 Status: Downloaded newer image for busybox:latest 1d2f47a82445a946ea78c44058baf1fe21c5f3204bc484197139b0808842c50d docker: Error response from daemon: attaching to network failed, make sure your network options are correct and check manager logs: context deadline exceeded.
-
启动容器应用的时候,报防火墙相关问题
[root@116 xxl]# docker run -itd --name xxladmin -p 8080:8080 --network=testnetwork5 xxladmin1.0 c3d9f1a5a62c50aee388364306ba575aa0b5a59fbc85c5408b2040b7a3250ff5 docker: Error response from daemon: container c3d9f1a5a62c50aee388364306ba575aa0b5a59fbc85c5408b2040b7a3250ff5: endpoint join on GW Network failed: driver failed programming external connectivity on endpoint gateway_79cd50cb1a80 (0f8dc70541f22f02d3e89832e929dfc18ffcac3d15ed563a6f7c2b464bdb9111): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 8080 -j DNAT --to-destination 172.31.0.4:8080 ! -i br-167a79121a6e: iptables: No chain/target/match by that name. (exit status 1)).
-
-
在116上创建swarm manager
root@116 xxl]# docker swarm init --advertise-addr 172.18.102.116
Swarm initialized: current node (nvb5j0no5xzsz7kp9hue2by9f) is now a manager.
To add a worker to this swarm, run the following command:
docker swarm join --token SWMTKN-1-2zv0qv7nkownkvrojs3jhipjg5jnjmsod1i6ihy2l8ppw6cine-bdjj76p0xaavdnjfjha5ek8n8 172.18.102.116:2377
To add a manager to this swarm, run 'docker swarm join-token manager' and follow the instructions.
-
然后去134上执行docker swarm join命令
-
在116上创建overlay网络
docker network create -d overlay --attachable my-network5
执行后,其实在134上也会有此网络信息。
部署xxl-job-admin
上传xxl-job-admin-2.3.0.jar
创建admin的Dockerfile
FROM java:8
MAINTAINER KelvinLiu
ADD xxl-job-admin-2.3.0.jar xxl-job-admin-2.3.0.jar
EXPOSE 8080
ENTRYPOINT ["java","-jar","xxl-job-admin-2.3.0.jar"]
构建admin的docker镜像
docker build -t xxladmin1.0 .
构建admin的docker容器
docker run -itd --name xxladmin -p 8080:8080 --network=testnetwork5 xxladmin1.0
访问admin
http://172.18.102.116:8080/xxl-job-admin/
部署dbc(其中含有xxl-job-executor)
上传dbc.jar
创建dbc的Dockerfile
FROM java:8
MAINTAINER KelvinLiu
ADD dbc.jar dbc.jar
EXPOSE 9999
ENTRYPOINT ["java","-jar","dbc.jar"]
构建dbc的docker镜像
docker build -t dbc1.0 .
构建dbc的docker容器
docker run -itd --name dbc -p 9999:9999 --network=testnetwork5 dbc1.0
查看admin上executor是否自动注册
前提要先建好执行器(蓝色不用填写),不然即使dbc启动,也不会自动创建执行器。这里理解为,执行器要自己建,如果建的名称相同,当dbc启动的时候,admin会通过自动运行的线程,将dbc这个executor的ip更新进来。
其实通过上图,可以看到,自动注册的执行器机器地址,是容器的IP
执行一次
这里就不多加说明了,执行的时候,是admin的机器调用了dbc容器的ip:9999端口的,overlay网络,保证了2个容器通过IP调用可通。
标签:--,dbc,调度,admin,job,executor,xxl 来源: https://blog.csdn.net/sdfiiiiii/article/details/121394046