dolphinscheduler源码解析-MasterSchedulerService
作者:互联网
dolphinscheduler 源码解析-MasterSchedulerService
文章目录
类定义
@Service
public class MasterSchedulerService extends Thread
可以看出该类继承了线程基类,那该类就可以在线程池内执行。
类属性
/**
* logger of MasterSchedulerService
*/
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/**
* dolphinscheduler database interface
*/
@Autowired
private ProcessService processService;
/**
* zookeeper master client
*/
@Autowired
private MasterRegistryClient masterRegistryClient;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
/**
* alert manager
*/
@Autowired
private ProcessAlertManager processAlertManager;
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient;
/**
* master exec service
*/
private ThreadPoolExecutor masterExecService;
可以看出它有一个ProcessService
这个属性集成了很多mappers类,提供数据dao服务。还有一个ProcessAlertManager
告警的管理器,另外也有netty的客户端和一个线程池。
初始化方法
@PostConstruct
public void init() {
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
对线程池进行赋值,并且创建一个netty的客户端。
既然是线程类,就必须有run方法,我们查看一下run方法
/**
* run of MasterSchedulerService
*/
@Override
public void run() {
logger.info("master scheduler started");
while (Stopper.isRunning()) {
try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if (!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
scheduleProcess();
} catch (Exception e) {
logger.error("master scheduler thread error", e);
}
}
}
该run方法会先检查一下资源,看是否有空闲资源,如果没有就让线程睡眠一会儿然后重新检查资源,当有了足够的资源就开始执行scheduleProcess
方法
我们向下追踪scheduleProcess
方法
private void scheduleProcess() throws Exception {
try {
masterRegistryClient.blockAcquireMutex();
int activeCount = masterExecService.getActiveCount();
// make sure to scan and delete command table in one transaction
Command command = processService.findOneCommand();
if (command != null) {
logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
try {
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(
new MasterExecThread(
processInstance
, processService
, nettyRemotingClient
, processAlertManager
, masterConfig));
}
} catch (Exception e) {
logger.error("scan command error ", e);
processService.moveToErrorCommand(command, e.toString());
}
} else {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} finally {
masterRegistryClient.releaseLock();
}
}
首先获取一个master的分布式锁
查看master的executor线程池中有多少active状态的线程。
从数据库中拿到一个command命令
然后构造一个该命令对应的实体类ProcessInstance
然后在该类的线程池内执行MasterExecThread线程【点击打开MasterExecThread】。
标签:MasterSchedulerService,dolphinscheduler,private,源码,command,master,线程,logger 来源: https://blog.csdn.net/sinat_35045195/article/details/119222093