编程语言
首页 > 编程语言> > Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制

Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制

作者:互联网

0. 说明

基于Flink 1.12

1. 背景知识

1.1 Actor模型

Flink底层RPC是通过AKKA实现的,AKKA是基于Actor模型实现的框架。下面,将大致介绍一下actor模型。
在Actor模型中,一切事物都是actor,一个actor是一个基本的计算单元,每个actor是完全隔离的,不会共享内存,也就不会有共享数据带来的并发问题;它们是自己维护自身的状态,该状态不会被其他actor直接修改。
整体模型大致是:多个actor同时运行,每个actor接收消息,并根据消息做出相应的反应。消息本身是通过异步的形式发送给actor的,消息会被存储在一个叫做“邮箱(mailbox)”的地方,actor会顺序的处理收到的信息,避免锁的使用。从描述可以了解到actor模型中,消息的发送者和已发送消息解耦,是以并发的形式处理数据的。

1.2 RPC

RPC作用是让远程调用像本地调用,封装调用的细节。
Flink定义了各个组件的Gateway,通过回调的方式隐藏实现细节,将业务本身和通信解绑了,方便RPC调用。目前,Flink的RPC请求的底层通信是通过AKKA的实现的。

1.2.1 RPC相关的接口

2. Flink心跳机制

2.1 核心接口

2.1.1 HeartbeatTarget

是可以发送心跳和请求心跳相应组件接口,是对具备心跳能力对象的一种抽象。
HeartbeatTarget的函数具备以下两种动作:

2.1.2 HeartbeatMonitor

HeartbeatMonitor管理HeartbeatTarget的心跳状态。当在指定时间内未收到心跳信息时,monitor将会通知对应的HeartbeatListener,收到心跳信息后会重置其定时器。其工厂接口如下:

     HeartbeatMonitor<O> createHeartbeatMonitor(
                ResourceID resourceID,
                HeartbeatTarget<O> heartbeatTarget,
                ScheduledExecutor mainThreadExecutor,
                HeartbeatListener<?, O> heartbeatListener,  //用于处理心跳信息
                long heartbeatTimeoutIntervalMs);

2.1.3 HeartbeatListener

HeartbeatListener是和HeartbeatManager交互的接口,Flink的业务的处理逻辑需要继承该接口以处理心跳结果,其三个回调函数如下:

2.1.4 HeartbeatManager

心跳的管理者,用于开始/停止对HeartbeatTarget的心跳监控,以及会处理某个节点的心跳超时。
HeartbeatManager继承了HeartbeatTarget,其具有了HeartbeatTarget的函数功能以外,该接口还有以下四种函数:

核心接口交互的大致过程:HeartbeatManager将HeartbeatTarget放入到监控列表中,当心跳超时时,HeartbeatMonitor回通知HeartbeatListener处理,通过对HeartbeatListener的实现,完成相关处理心跳超时的逻辑。

2.2. 核心接口的实现

下面通过分析1.3.1中核心接口的实现类,来具体分析心跳处理的过程。

2.2.1 HearbeatManagerImpl

该manager维护了一个heartbeat 的监控对象(HeartbeatMonitor)和资源ID信息,当收到新的心跳信息是,monitor对象将会被更新;心跳超时时,将会通知HeartbeatListenter对象。

public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
  //心跳间隔
  /** Heartbeat timeout interval in milli seconds. */
  private final long heartbeatTimeoutIntervalMs;
  //心跳
  /** Heartbeat listener with which the heartbeat manager has been associated. */
  private final HeartbeatListener<I, O> heartbeatListener;
  //使用一个map存放资源-心跳的monitor信息,其monitorTarget方法就是将对应信息放入该map中
  /** Map containing the heartbeat monitors associated with the respective resource ID. */
  private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

  /** Running state of the heartbeat manager. */
  protected volatile boolean stopped;

HearbeatManagerImpl实现的主要函数有:

2.2.2 HeartbeatManagerSenderImpl

继承于HearbeatManagerImpl,HeartbeatManagerSenderImpl向其监控的heartbeatTarget对象请求心跳的响应,属于主动触发心跳请求。实现了Runnable接口,在其run方法中,会遍历heartbeatMonitor,通过requestHeartbeat()方法向节点获取心跳信息。

  public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
    @Override
    public void run() {
        if (!stopped) {
            log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
                requestHeartbeat(heartbeatMonitor);
            }
            // 周期性调度,事件周期可配
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
    // 主动发起心跳请求
    private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
        O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
        final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
        // 调用Target的 requestHeartbeat函数
        heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
    }

}

2.2.3 HeartbeatMonitorImpl

HeartbeatMonitor管理心跳目标,它在初始化会启动一个ScheduledExecutor。

public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

    /** Resource ID of the monitored heartbeat target. */
    private final ResourceID resourceID; //监控的资源ID
    /** Associated heartbeat target. */
    private final HeartbeatTarget<O> heartbeatTarget;  //心跳目标

    private final ScheduledExecutor scheduledExecutor;
    /** Listener which is notified about heartbeat timeouts. */
    private final HeartbeatListener<?, ?> heartbeatListener;

    HeartbeatMonitorImpl(
            ResourceID resourceID,
            HeartbeatTarget<O> heartbeatTarget,
            ScheduledExecutor scheduledExecutor,
            HeartbeatListener<?, O> heartbeatListener,
            long heartbeatTimeoutIntervalMs) {

        this.resourceID = Preconditions.checkNotNull(resourceID);
        this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
        this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);

        Preconditions.checkArgument(
                heartbeatTimeoutIntervalMs > 0L,
                "The heartbeat timeout interval has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;

        lastHeartbeat = 0L;
        //初始化的时候,就启动一个定时任务
        resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
    }
    
    @Override
    public void run() {
        // The heartbeat has timed out if we're in state running
        if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
          //通知heartbeatListener处理
            heartbeatListener.notifyHeartbeatTimeout(resourceID);
        }
    }

    void resetHeartbeatTimeout(long heartbeatTimeout) {
        if (state.get() == State.RUNNING) {
            cancelTimeout();
            //重新开启新的定时任务
            futureTimeout =
                    scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);

            // Double check for concurrent accesses (e.g. a firing of the scheduled future)
            if (state.get() != State.RUNNING) {
                cancelTimeout();
            }
        }
    }

  }

2.2.4 HeartbeatServices

HeartbeatServices为所有需要心跳服务的创建heartbeat receivers and heartbeat senders。

  public class HeartbeatServices {
      /**
     * 创建 heartbeat receivers
     * Creates a heartbeat manager which does not actively send heartbeats.
     */
      public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
            ResourceID resourceId,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {

        return new HeartbeatManagerImpl<>(
                heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
    }
     /**
     * 创建 heartbeat sender
     * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
     */
        public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
            ResourceID resourceId,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {

        return new HeartbeatManagerSenderImpl<>(
                heartbeatInterval,
                heartbeatTimeout,
                resourceId,
                heartbeatListener,
                mainThreadExecutor,
                log);
    }
        // 从配置文件配置心跳间隔时间和心跳超时时间
    //两者的关系 0 < 心跳间隔时间 < 心跳超时时间
        public static HeartbeatServices fromConfiguration(Configuration configuration) {
        long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);

        long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

        return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
    }
  }

3. RM和TM的交互

3.1. 总述

在一个Flink集群中只有一个ResourceManager(RM),和一个或多个TaskManager(TM)。两者的交互过程为:TM启动时会向RM注册,注册成功之后,RM会主动要求TM上报心跳信息。通过RM和TM的心跳信息,双方知道对方是否存活。
在2.2.4小节总,我们知道HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。sender要对心跳目标上报心跳信息,receiver收到信息请求后返回一个response。

3.2. 初始化过程

3.2.1 ResourceManager

  public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
        extends FencedRpcEndpoint<ResourceManagerId>
        implements ResourceManagerGateway, LeaderContender {
          // RM启动时运行的方法
    @Override
    public final void onStart() throws Exception {
        try {
          // 启动RMServices
            startResourceManagerServices();
        } catch (Throwable t) {
            final ResourceManagerException exception =
                    new ResourceManagerException(
                            String.format("Could not start the ResourceManager %s", getAddress()),
                            t);
            onFatalError(exception);
            throw exception;
        }
    }  
}
      private void startHeartbeatServices() {
        taskManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new TaskManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);

        jobManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new JobManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);
    }

结合2.2.2小节,RM在心跳服务在和TM与JM的心跳过程中,充当的是请求心跳请求的发起方,即RM是主动去拉取心跳信息的。

3.2.2 TaskExecutor

TaskExecutor在创建时,就初始化了心跳组件。

  public TaskExecutor(
            RpcService rpcService,
            TaskManagerConfiguration taskManagerConfiguration,
            HighAvailabilityServices haServices,
            TaskManagerServices taskExecutorServices,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            HeartbeatServices heartbeatServices,
            TaskManagerMetricGroup taskManagerMetricGroup,
            @Nullable String metricQueryServiceAddress,
            BlobCacheService blobCacheService,
            FatalErrorHandler fatalErrorHandler,
            TaskExecutorPartitionTracker partitionTracker,
            BackPressureSampleService backPressureSampleService) {
        //创建HeartbeatManagerImpl,对JM的心跳进行相应
        this.jobManagerHeartbeatManager =
                createJobManagerHeartbeatManager(heartbeatServices, resourceId);
        // 创建HeartbeatManagerImpl,对RM的心跳进行相应
        this.resourceManagerHeartbeatManager =
                createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
    }

3.3 TM向RM注册过程

3.3.1 流程图

  TaskExecutor#onStart
  |
  TaskExecutor#startTaskExecutorServices
  |
  StandaloneLeaderRetrievalService#start  //以standalone模式分析
  |
  |//在standalone模式下,已知晓JobManager的地址,会直接去链接RM
  TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress 
  |
  TaskExecutor#notifyOfNewResourceManagerLeader
  |
  TaskExecutor#reconnectToResourceManager
  | 
  |//在该方法中会主动调用TaskExecutorToResourceManagerConnection类的start方法去链接RM
  TaskExecutor#connectToResourceManager  
  |
  | //在该函数的createNewRegistration方法中的回调函数,处理注册成功后的逻辑
  RegisteredRpcConnection#start
  |
  |//z在该方法中会先链接RM,然后连接成功后发起注册请求
  RetryingRegistration#startRegistration
  |
  RetryingRegistration#register
  |
  TaskExecutorToResourceManagerConnection#invokeRegistration

到此,TM向RM发起了注册,通过AKKA RPC,请求来到了RM中。

  ResourceManager#registerTaskExecutor
  |
  |// 该方法的返回值是RegistrationResponse,在该方法中会将调用taskManagerHeartbeatManager.monitorTarget,监控节点的心跳信息
  ResourceManager#registerTaskExecutorInternal
  |
  return new TaskExecutorRegistrationSuccess(
                    registration.getInstanceID(), resourceId, clusterInformation)
  //注册成功后将会走start方法中createNewRegistration创建registration时的回调函数
  RegisteredRpcConnection#start
  |
  TaskExecutorToResourceManagerConnection#onRegistrationSuccess
  |
  TaskExecutor#onRegistrationSuccess
  |
  | //和RM建立联系,开始监控RM
  TaskExecutor#establishResourceManagerConnection
  |
  resourceManagerHeartbeatManager#monitorTarget

3.3.2 具体分析

下面主要分析心跳交互过程。

    public void start() {
      checkState(!closed, "The RPC connection is already closed");
      checkState(
              !isConnected() && pendingRegistration == null,
              "The RPC connection is already started");
      //会在创建newRegistration时,定义链接成功后处理逻辑
      final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();

      if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
          // 启动注册
          newRegistration.startRegistration();
      } else {
          // concurrent start operation
          newRegistration.cancel();
      }
  }

在此过程中有好多回调,需要慢慢的体会。

3.4 TM和RM的心跳过程

3.4.1 RM请求心跳的过程

由RM的初始化的分析,我们了解到,RM会主动要求TM上报心跳,其过程如下:

  // 在该该方法中会创建一个HeartbeatManagerSenderImpl
  ResourceManager#startHeartbeatServices
  |
  | //这里会一步步调用构造函数中,在该构造函数中,会将心跳检查加入周期性任务列表中
  HeartbeatManagerImpl
  |
  | //在任务启动时,会调用HeartbeatManagerSenderImpl的run方法,在该方法中会循环遍历HeartbeatMonitor,通过requestHeartbeat要求target上报心跳信息  
  HeartbeatManagerSenderImpl#run
  |
  |  //该调用会跑到ResourceManager#TaskManagerHeartbeatListener中,这里返回为null是因为RM不是任何组件的receiver,即不会有组件向RM请求心跳信息,并要求其返回心跳。
  getHeartbeatListener().retrievePayload
  |
  |  //这里会调用TM向RM注册时指定的requestHeartbeat
  heartbeatTarget.requestHeartbeat
  |
  |   //ResouceManager#registerTaskExecutorInternal
  taskExecutorGateway.heartbeatFromResourceManager

3.4.2 TM处理心跳请求

通过RPC调用,请求来到了TM中,其过程如下:

  TaskExecutor#heartbeatFromResourceManager
  |
  HeartbeatManagerImpl#requestHeartbeat
  |
  HeartbeatMonitorImpl#reportHeartbeat
  |
  | //在该方法中,判断若是running则会取消之前的Timeout定时任务ScheduledFuture,重新开始检查是否timeout超时的定时任务。
  HeartbeatMonitorImpl#resetHeartbeatTimeout
  |
  |  //因为从RM发来的请求中heartbeatPayload为null,则TM直接走返回心跳反应的流程
  HeartbeatMonitorImpl#reportHeartbea->heartbeatTarget.receiveHeartbeat
  |
  | //这里生成TM的心跳信息,包括slot信息
  TaskExecutor.ResourceManagerHeartbeatListener#retrievePayload
  |
  |  //通过在TM向RM注册过程中定义的receiveHeartbeat方法来实现调用RM中方法
  TaskExecutor#establishResourceManagerConnection-> resourceManagerGateway.heartbeatFromTaskManager

3.4.3 RM处理周期性心跳信息过程

RM收到TM的心跳信息,主要做了两件事:重置RM的Monitor线程;解析TM上报信息

  ResourceManager#heartbeatFromTaskManager
  |
  HeartbeatManagerImpl#reportHeartbeat
  |
  | //和TM一样,重置了monitor线程
  HeartbeatManagerImpl#reportHeartbeat->reportHeartbeat
  |
  | //在该方法中处理上报的slot信息,
  ResourceManager.TaskManagerHeartbeatListener#reportPayload

周期性心跳的具体分析过程见上述流程中的注释。

4. 参考文章

标签:调用,Flink,TaskExecutor,RPC,源码,心跳,RM,TM
来源: https://www.cnblogs.com/love-yh/p/15037809.html