2021SC@SDUSC(dolphinscheduler- common)
作者:互联网
executeProcess按顺序调用了prepareProcess、runProcess、endProcess三个方法,简单来说就是初始化、执行、释放资源。 prepareProcess又按顺序调用了initTaskQueue、buildFlowDag。
initTaskQueue就是一些资源的初始化操作,比如通过流程定义ID查询到当前的任务实例。下面是其核心逻辑,可以发现,就是查询了完成的任务列表,报错且不能重试的任务列表。
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.isTaskComplete()){
completeTaskList.put(task.getName(), task);
}
if(task.getState().typeIsFailure() && !task.taskCanRetry()){
errorTaskList.put(task.getName(), task);
}
}
buildFlowDag看名字应该是生成DAG实例的,代码虽短,但调用了好几个函数,我们只重点分析最后一个函数调用。
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
if(processDag == null){
logger.error("processDag is null");
return;
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
}
DagHelper.buildDagGraph生成了一个DAG对象实例,根据名字和注释猜测,这应该是对有向无环图的一个抽象。
/**
* the object of DAG
*/
private DAG<String,TaskNode,TaskNodeRelation> dag;
来看下DAG类的定义
/**
* analysis of DAG
* Node: node
* NodeInfo:node description information
* EdgeInfo: edge description information
*/
public class DAG<Node, NodeInfo, EdgeInfo>
DAG有三个类型参数,分别代表节点key、节点信息、边信息。
下面是TaskNode的字段
发现TaskNode的字段跟UI一一对应
TaskNodeRelation代表边的信息,字段比较少,只有startNode、endNode两个String类型的字段。这其实是DAG类的第一个类型参数,节点的key。
public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/**
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node);
}
}
/**
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode());
}
}
return dag;
}
上面是buildDagGraph的源码。可以看出,增加节点时,第一个参数是TaskNode的getName。跟猜测的一样,DAG的第一个参数就是node的key,而key就是名称。
细心的读者一定发现,DAG对象是根据ProcessDag来创建的
DAG是把节点、边的一个List转化成了一个Graph。
初始化完成之后,来看一下具体如何执行流程定义的。
这个方法源码很长,我们首先从整体简要分析。
- submitPostNode(null)
- 起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待)
- 首先判断是否超时,超时则发送预警邮件
- 获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。
- 如果当前任务实例已经结束,则从Map中移除
- 如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName())
- 如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停)
- 更新当前流程定义实例的状态,进入下一个循环
标签:task,2021SC,实例,dolphinscheduler,processInstance,dag,processDag,SDUSC,DAG 来源: https://blog.csdn.net/Santan1412/article/details/121317599