编程语言
首页 > 编程语言> > [笔记迁移][Spark][9]Spark源码——内核架构2

[笔记迁移][Spark][9]Spark源码——内核架构2

作者:互联网

续上篇,Spark源码——内核架构1
(4)最最最最最重要的机制:资源调度schedule()
   [1]Master总调度——Driver(on Worker)调度机制

  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule (): Unit = {
    // 首先判断Master状态,若不是ALIVE则直接返回,也就是说,Standby的Master不会进行App等资源调度
    if (state != RecoveryState.ALIVE) { return}
   
    // Drivers take strict precedence over executors
    // 从workers:HashSet[WorkerInfo]中随机且乱序(Radom.shuffle)地取出之前注册过来且状态为ALIVE的worker
    val shuffledAliveWorkers = Random. shuffle( workers. toSeq. filter(_. state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
   
    // 首先调度Driver
    // *什么时候会注册Driver导致Driver被调度?
    // *只有用yarn-cluster模式提交时才会注册Driver,因为standalone和yarn-client两种模式都会在本地直接启动Driver,不会注册Driver,也就更不可能让Master调度Driver
   
    for ( driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers:ArrayBuffer[DriverInfo]
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
     
      // 只要还有ALIVE的Workers没有被遍历 且 当前这个Driver没有被启动就继续遍历
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
       
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 如果当前Worker资源满足Driver需要,则在当前Worker上启动Driver
          launchDriver(worker, driver)
          // 已启动的Driver退出等待队列
          waitingDrivers -= driver
          // 下次退出while
          launched = true
        }
        // 指针循环后移
        curPos = (curPos + 1) % numWorkersAlive
      }
     
    }
    startExecutorsOnWorkers()
  }
  /** Returns a new collection of the same type in a randomly chosen order.
   *
   *  @return         the shuffled collection
   */
  def shuffle[T, CC[X] <: TraversableOnce[X ]](xs : CC[T])(implicit bf: CanBuildFrom[CC [T], T, CC[T]]): CC[T ] = {
    val buf = new ArrayBuffer[ T] ++= xs

    def swap(i1: Int, i2: Int) {
      val tmp = buf(i1)
      buf(i1) = buf(i2)
      buf(i2) = tmp
    }

    for ( n <- buf.length to 2 by -1 ) {
      val k = nextInt(n)
      swap(n - 1, k)
    }

    (bf(xs) ++= buf).result()
  }
  private def launchDriver (worker : WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver .id + " on worker " + worker .id )
    // 将Driver加入Worker的内存缓存结构
    // 将Worker内使用的资源(内存和cpu数量),都加上Driver需要的资源(内存和 cpu数量)
    worker.addDriver(driver)
    // 同时把Worker加入到Driver内部的缓存结构中
    driver.worker = Some(worker)
    // Worker底层的RPC终端发送LaunchDriver消息,让Worker来启动Driver
    worker.endpoint.send(LaunchDriver( driver. id, driver. desc))
    driver.state = DriverState.RUNNING
  }

    [2] Application调度机制:重中之重,核心之核心。实际上实在脚本需求的“总资源量”上调度
   两种调度算法:spreadOutApps(默认)尽量平均分配至尽可能多的Worker上,每个Worker在一次循环中分配一个;非spreadOutApps尽可能多地利用一个Worker上的资源,用完再分配至下一个Worker

  // As a temporary workaround before better ways of configuring memory, we allow users to set
  // a flag that will perform round-robin scheduling across the nodes (spreading out each app
  // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
  private val spreadOutApps = conf .getBoolean ("spark.deploy.spreadOut" , true)
  /**
   * Schedule and launch executors on workers afrer drivers
   */
  private def startExecutorsOnWorkers (): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for ( app <- waitingApps) {
      //启动一个Excutor至少需要一个cpu
      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse( 1)
      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
      if (app.coresLeft >= coresPerExecutor) {
        // Filter out workers that don't have enough resources to launch an executor
        // 按照空闲cpu数量倒序排序
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
            worker.coresFree >= coresPerExecutor)
          .sortBy(_.coresFree).reverse
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
        }
      }
    }
  }
  /**!!!!!!!!!!!!!!!!!!!!!!!!!!
   * Schedule executors to be launched on the workers.
   * Returns an array containing number of cores assigned to each worker.
   *
   * There are two modes of launching executors. The first attempts to spread out an application's
   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
   * on as few workers as possible). The former is usually better for data locality purposes and is
   * the default.
   *
   * The number of cores assigned to each executor is configurable. When this is explicitly set,
   * multiple executors from the same application may be launched on the same worker if the worker
   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
   * worker by default, in which case only one executor per application may be launched on each
   * worker during one single schedule iteration.
   * Note that when `spark.executor.cores` is not set, we may still launch multiple executors from
   * the same application on the same worker. Consider appA and appB both have one executor running
   * on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,
   * thus for the next schedule iteration, appA launches a new executor that grabs all the free
   * cores on worker1, therefore we get multiple executors from appA running on worker1.
   *
   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
   * allocated at a time, 12 cores from each worker would be assigned to each executor.
   * Since 12 < 16, no executors would launch [SPARK-8881].
   */
  private def scheduleExecutorsOnWorkers (
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app. desc. coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse (1 )
    val oneExecutorPerWorker = coresPerExecutor.isEmpty  //Boolean
    val memoryPerExecutor = app. desc. memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable ) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable ) // Number of new executors on each worker
    // 获取可分配cpu数量,取app剩余需分配的cpu数量和Worker空闲cpu总数的最小值(“量超”没用,“量少”没有)
    var coresToAssign= math.min(app.coresLeft,usableWorkers.map(_.coresFree) .sum)

    /** Return whether the specified worker can launch an executor for this app. */
    def canLaunchExecutor( pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers( pos).coresFree - assignedCores( pos) >= minCoresPerExecutor
      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = ! oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors .sum + app .executors .size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We're adding cores to an existing executor, so no need
        // to check memory and executor limits
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application's limits
    var freeWorkers = ( 0 until numUsable).filter (canLaunchExecutor )
    while ( freeWorkers.nonEmpty ) {
      freeWorkers. foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor (pos )) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker ) {
            assignedExecutors(pos ) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps ) {
            keepScheduling = false
           // break while , continue foreach
          }
        }
      }
      freeWorkers = freeWorkers.filter (canLaunchExecutor )
    }
    assignedCores
  }
/** 
   * Allocate a worker's resources to one or more executors.
   * @param app the info of the application which the executors belong to
   * @param assignedCores number of cores on this worker for this application
   * @param coresPerExecutor number of cores per executor
   * @param worker the worker info
   */
  private def allocateWorkerResourceToExecutors (
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse (1)
    val coresToAssign = coresPerExecutor.getOrElse (assignedCores)
    for ( i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)
      launchExecutor( worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

  private[master] def addExecutor (
      worker: WorkerInfo,
      cores: Int,
      useID: Option[Int] = None): ExecutorDesc = {
    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
    executors(exec.id) = exec
    coresGranted += cores
    exec
  }

private[master] class ExecutorDesc(
    val id: Int,
    val application: ApplicationInfo,
    val worker: WorkerInfo,
    val cores: Int,
    val memory: Int)
  }
  
private def launchExecutor (worker : WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec .fullId + " on worker " + worker .id )
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application. driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

文章目录

6. Worker

Worker

private[deploy] class Worker(
    override val rpcEnv : RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[ RpcAddress],
    endpointName: String,
    workDirPath: String = null ,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging

(1)在Worker上启动Driver相关

 case LaunchDriver(driverId , driverDesc ) =>
      logInfo(s"Asked to launch driver $ driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy (command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver  //drivers: HashMap
      driver.start()
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
/**
 * Manages the execution of one driver, including automatically restarting the driver on failure.
 * This is currently only used in standalone cluster deploy mode.
 */
private[deploy] class DriverRunner(
    conf: SparkConf,
    val driverId: String,
    val workDir: File,
    val sparkHome: File,
    val driverDesc: DriverDescription,
    val worker: RpcEndpointRef,
    val workerUrl: String,
    val securityManager: SecurityManager)
  extends Logging
//driver.start()
  /** Starts a thread to run and manage the driver. */
  private[worker] def start () = {
    new Thread( "DriverRunner for " + driverId) {  //java.lang.Thread
      override def run () {
        var shutdownHook : AnyRef = null
        try {
          shutdownHook = ShutdownHookManager. addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId ")
            kill()
          }

          // prepare driver jars and run driver
          val exitCode = prepareAndRunDriver()

          // set final state depending on if forcibly killed and process exit code
          finalState = if (exitCode == 0 ) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e : Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some( e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook( shutdownHook)
          }
        }

        // *KEY:notify worker of final driver state, possible exception
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }


  private[worker] def prepareAndRunDriver (): Int = {
    val driverDir = createWorkingDirectory()
    val localJarFilename = downloadUserJar(driverDir )

    def substituteVariables( argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // TODO : If we add ability to submit multiple jars they should also be added here
    val builder = CommandUtils.buildProcessBuilder (driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }

  /**
   * Creates the working directory for this driver.
   * Will throw an exception if there are errors preparing the directory.
   */
  private def createWorkingDirectory (): File = {
    val driverDir = new File(workDir , driverId )
    if (! driverDir.exists () && !driverDir .mkdirs ()) {
      throw new IOException("Failed to create directory " + driverDir )
    }
    driverDir
  }

  /**
   * Download the user jar into the supplied directory and return its local path.
   * Will throw an exception if there are errors downloading the jar.
   */
  private def downloadUserJar (driverDir : File): String = {
    val jarFileName = new URI(driverDesc.jarUrl).getPath. split("/" ).last
    val localJarFile = new File(driverDir , jarFileName )
    if (! localJarFile.exists ()) { // May already exist if running multiple workers on one node
      logInfo(s"Copying user jar $ {driverDesc.jarUrl} to $localJarFile ")
      Utils.fetchFile (
        driverDesc.jarUrl,
        driverDir,
        conf,
        securityManager,
        SparkHadoopUtil.get .newConfiguration (conf),
        System.currentTimeMillis(),
        useCache = false )
      if (! localJarFile.exists ()) { // Verify copy succeeded
        throw new IOException(
          s "Can not find expected jar $jarFileName which should have been loaded in $driverDir")
      }
    }
    localJarFile. getAbsolutePath
  }

  private def runDriver (builder : ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    builder.directory(baseDir)
    def initialize(process: Process): Unit = {
      // Redirect stdout and stderr to files
      val stdout = new File( baseDir, "stdout")
      CommandUtils.redirectStream (process .getInputStream , stdout )

      val stderr = new File( baseDir, "stderr")
      val formattedCommand = builder.command .asScala .mkString (" \"", "\" \"" , " \"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40 )
      Files.append (header , stderr , StandardCharsets.UTF_8)
      CommandUtils.redirectStream (process .getErrorStream , stderr )
    }
    runCommandWithRetry( ProcessBuilderLike(builder ), initialize , supervise )
    //底层的核心:exitCode = process .get. waitFor(),启动Driver进程并等待
  }

//Worker.DriverChangedState
  private[worker] def handleDriverStateChanged (driverStateChanged : DriverStateChanged): Unit = {
    val driverId = driverStateChanged. driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged. state
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $ driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $ driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $ driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $ driverId was killed by user")
      case _ =>
        logDebug(s"Driver $ driverId changed state to $state ")
    }
    // Driver执行完后,DriverRunner通过线程发送状态给Worker,然后Worker会将此消息转发给Master,Master会进行状态改变处理
    sendToMaster( driverStateChanged)
    // 将Driver从本地缓存移除
    valdriver = drivers.remove(driverId).get
    // 将Driver加入完成队列 
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    // 释放资源
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver. driverDesc.cores
  }

(2)在Worker上启动Executor相关

 case LaunchExecutor(masterUrl , appId , execId , appDesc , cores_ , memory_ ) =>
   if (masterUrl != activeMasterUrl) {
     logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
   } else {
     try {
       logInfo("Asked to launch executor %s/%d for %s".format( appId, execId, appDesc.name ))

       // Create the executor's working directory
       val executorDir = new File(workDir, appId + "/" + execId)
       if (!executorDir .mkdirs ()) {
         throw new IOException("Failed to create directory " + executorDir)
       }

       // Create local dirs for the executor. These are passed to the executor via the
       // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
       // application finishes.
       val appLocalDirs = appDirectories .getOrElse (appId , {
         val localRootDirs = Utils. getOrCreateLocalRootDirs(conf)
         val dirs = localRootDirs .flatMap { dir =>
           try {
             val appDir = Utils.createDirectory(dir, namePrefix = "executor" )
             Utils.chmod700(appDir)
             Some(appDir.getAbsolutePath())
           } catch {
             case e: IOException =>
               logWarning(s "${e.getMessage} . Ignoring this directory.")
               None
           }
         }. toSeq
         if (dirs .isEmpty ) {
           throw new IOException("No subfolder can be created in " +
             s "${ localRootDirs.mkString ("," )}." )
         }
         dirs
       })
       appDirectories(appId ) = appLocalDirs


       val manager = new ExecutorRunner (
         appId,
         execId,
         appDesc. copy( command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
         cores_,
         memory_,
         self,
         workerId,
         host,
         webUi.boundPort,
         publicAddress,
         sparkHome,
         executorDir,
         workerUri,
         conf,
         appLocalDirs, ExecutorState. RUNNING)

       //把ExecutorRunner加入本地缓存
       executors(appId + "/" + execId) = manager
       //启动ExecutorRunner
       manager.start()
       coresUsed += cores_
       memoryUsed += memory_
       //向Master发送ExecutorStateChanged消息
       sendToMaster(ExecutorStateChanged( appId, execId, manager. state, None, None))
     } catch {
       case e : Exception =>
         logError(s"Failed to launch executor $appId /$execId for ${appDesc.name}.", e)
         if (executors.contains(appId + "/" + execId)) {
           executors(appId + "/" + execId).kill ()
           executors -= appId + "/" + execId
         }
         sendToMaster(ExecutorStateChanged( appId, execId, ExecutorState.FAILED,
           Some( e. toString), None))
     }
  private[worker] def start () {
    workerThread = new Thread("ExecutorRunner for " + fullId ) {
      override def run () { fetchAndRunExecutor () }
    }
    workerThread. start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some( "Worker shutting down")) }
  }

  /**
   * Download and run the executor described in our ApplicationDescription
   */
  private def fetchAndRunExecutor () {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder (appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command. asScala. mkString( "\"" , " \" \" ", "\"" )
      logInfo(s"Launch command: $ formattedCommand")

      builder.directory(executorDir)
      builder.environment. put( "SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment. put( "SPARK_LAUNCH_WITH_SCALA", "0" )

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy" , false)) {
          s "/proxy/$workerId/logPage/?appId=$ appId&executorId=$execId &logType="
        } else {
          s"http://$publicAddress:$ webUiPort/logPage/?appId=$appId &executorId=$execId&logType="
        }
      builder.environment. put( "SPARK_LOG_URL_STDERR", s"$ {baseUrl}stderr")
      builder.environment. put( "SPARK_LOG_URL_STDOUT", s"$ {baseUrl}stdout")



      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40 )

      // Redirect its stdout and stderr to files
      val stdout = new File( executorDir, "stdout" )
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File( executorDir, "stderr" )
      Files.write (header , stderr , StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      // 发送ExecutorStateChanged消息给Worker
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None )
      case e: Exception =>
        logError("Error running executor" , e )
        state = ExecutorState.FAILED
        killProcess(Some (e .toString ))
    }
  }
  private[worker] def handleExecutorStateChanged (executorStateChanged : ExecutorStateChanged):
    Unit = {
    // 直接向Master发送ExecutorStateChanged消息
    sendToMaster( executorStateChanged)
    val state = executorStateChanged. state
    if (ExecutorState.isFinished (state )) {
      val appId = executorStateChanged. appId
      val fullId = appId + "/" + executorStateChanged.execId
      val message = executorStateChanged. message
      val exitStatus = executorStateChanged.exitStatus
      executors.get(fullId) match {
        case Some (executor ) =>
          logInfo("Executor " + fullId + " finished with state " + state +
            message. map( " message " + _).getOrElse ("" ) +
            exitStatus.map (" exitStatus " + _).getOrElse ("" ))
          //将Executor从内存缓存中移除
          executors -= fullId
          finishedExecutors(fullId ) = executor
          trimFinishedExecutorsIfNecessary()
          coresUsed -= executor .cores
          memoryUsed -= executor .memory
        case None =>
          logInfo("Unknown Executor " + fullId + " finished with state " + state +
            message. map( " message " + _).getOrElse ("" ) +
            exitStatus.map (" exitStatus " + _).getOrElse ("" ))
      }
      maybeCleanupApplication( appId)
    }
  }

标签:val,driver,worker,源码,内核,executor,cores,Spark,Worker
来源: https://blog.csdn.net/weixin_38240095/article/details/96271629