[笔记迁移][Spark][9]Spark源码——内核架构2
作者:互联网
续上篇,Spark源码——内核架构1
(4)最最最最最重要的机制:资源调度schedule()
[1]Master总调度——Driver(on Worker)调度机制
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule (): Unit = {
// 首先判断Master状态,若不是ALIVE则直接返回,也就是说,Standby的Master不会进行App等资源调度
if (state != RecoveryState.ALIVE) { return}
// Drivers take strict precedence over executors
// 从workers:HashSet[WorkerInfo]中随机且乱序(Radom.shuffle)地取出之前注册过来且状态为ALIVE的worker
val shuffledAliveWorkers = Random. shuffle( workers. toSeq. filter(_. state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 首先调度Driver
// *什么时候会注册Driver导致Driver被调度?
// *只有用yarn-cluster模式提交时才会注册Driver,因为standalone和yarn-client两种模式都会在本地直接启动Driver,不会注册Driver,也就更不可能让Master调度Driver
for ( driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers:ArrayBuffer[DriverInfo]
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
// 只要还有ALIVE的Workers没有被遍历 且 当前这个Driver没有被启动就继续遍历
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 如果当前Worker资源满足Driver需要,则在当前Worker上启动Driver
launchDriver(worker, driver)
// 已启动的Driver退出等待队列
waitingDrivers -= driver
// 下次退出while
launched = true
}
// 指针循环后移
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
/** Returns a new collection of the same type in a randomly chosen order.
*
* @return the shuffled collection
*/
def shuffle[T, CC[X] <: TraversableOnce[X ]](xs : CC[T])(implicit bf: CanBuildFrom[CC [T], T, CC[T]]): CC[T ] = {
val buf = new ArrayBuffer[ T] ++= xs
def swap(i1: Int, i2: Int) {
val tmp = buf(i1)
buf(i1) = buf(i2)
buf(i2) = tmp
}
for ( n <- buf.length to 2 by -1 ) {
val k = nextInt(n)
swap(n - 1, k)
}
(bf(xs) ++= buf).result()
}
private def launchDriver (worker : WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver .id + " on worker " + worker .id )
// 将Driver加入Worker的内存缓存结构
// 将Worker内使用的资源(内存和cpu数量),都加上Driver需要的资源(内存和 cpu数量)
worker.addDriver(driver)
// 同时把Worker加入到Driver内部的缓存结构中
driver.worker = Some(worker)
// Worker底层的RPC终端发送LaunchDriver消息,让Worker来启动Driver
worker.endpoint.send(LaunchDriver( driver. id, driver. desc))
driver.state = DriverState.RUNNING
}
[2] Application调度机制:重中之重,核心之核心。实际上实在脚本需求的“总资源量”上调度
两种调度算法:spreadOutApps(默认)尽量平均分配至尽可能多的Worker上,每个Worker在一次循环中分配一个;非spreadOutApps尽可能多地利用一个Worker上的资源,用完再分配至下一个Worker
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf .getBoolean ("spark.deploy.spreadOut" , true)
/**
* Schedule and launch executors on workers afrer drivers
*/
private def startExecutorsOnWorkers (): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for ( app <- waitingApps) {
//启动一个Excutor至少需要一个cpu
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse( 1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
// 按照空闲cpu数量倒序排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
/**!!!!!!!!!!!!!!!!!!!!!!!!!!
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor per application may be launched on each
* worker during one single schedule iteration.
* Note that when `spark.executor.cores` is not set, we may still launch multiple executors from
* the same application on the same worker. Consider appA and appB both have one executor running
* on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,
* thus for the next schedule iteration, appA launches a new executor that grabs all the free
* cores on worker1, therefore we get multiple executors from appA running on worker1.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def scheduleExecutorsOnWorkers (
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app. desc. coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse (1 )
val oneExecutorPerWorker = coresPerExecutor.isEmpty //Boolean
val memoryPerExecutor = app. desc. memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable ) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable ) // Number of new executors on each worker
// 获取可分配cpu数量,取app剩余需分配的cpu数量和Worker空闲cpu总数的最小值(“量超”没用,“量少”没有)
var coresToAssign= math.min(app.coresLeft,usableWorkers.map(_.coresFree) .sum)
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor( pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers( pos).coresFree - assignedCores( pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = ! oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors .sum + app .executors .size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = ( 0 until numUsable).filter (canLaunchExecutor )
while ( freeWorkers.nonEmpty ) {
freeWorkers. foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor (pos )) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker ) {
assignedExecutors(pos ) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps ) {
keepScheduling = false
// break while , continue foreach
}
}
}
freeWorkers = freeWorkers.filter (canLaunchExecutor )
}
assignedCores
}
/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors (
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse (1)
val coresToAssign = coresPerExecutor.getOrElse (assignedCores)
for ( i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor( worker, exec)
app.state = ApplicationState.RUNNING
}
}
private[master] def addExecutor (
worker: WorkerInfo,
cores: Int,
useID: Option[Int] = None): ExecutorDesc = {
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
executors(exec.id) = exec
coresGranted += cores
exec
}
private[master] class ExecutorDesc(
val id: Int,
val application: ApplicationInfo,
val worker: WorkerInfo,
val cores: Int,
val memory: Int)
}
private def launchExecutor (worker : WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec .fullId + " on worker " + worker .id )
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application. driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
文章目录
6. Worker
private[deploy] class Worker(
override val rpcEnv : RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[ RpcAddress],
endpointName: String,
workDirPath: String = null ,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging
(1)在Worker上启动Driver相关
case LaunchDriver(driverId , driverDesc ) =>
logInfo(s"Asked to launch driver $ driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy (command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver //drivers: HashMap
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
* This is currently only used in standalone cluster deploy mode.
*/
private[deploy] class DriverRunner(
conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
val driverDesc: DriverDescription,
val worker: RpcEndpointRef,
val workerUrl: String,
val securityManager: SecurityManager)
extends Logging
//driver.start()
/** Starts a thread to run and manage the driver. */
private[worker] def start () = {
new Thread( "DriverRunner for " + driverId) { //java.lang.Thread
override def run () {
var shutdownHook : AnyRef = null
try {
shutdownHook = ShutdownHookManager. addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId ")
kill()
}
// prepare driver jars and run driver
val exitCode = prepareAndRunDriver()
// set final state depending on if forcibly killed and process exit code
finalState = if (exitCode == 0 ) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e : Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some( e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook( shutdownHook)
}
}
// *KEY:notify worker of final driver state, possible exception
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
private[worker] def prepareAndRunDriver (): Int = {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir )
def substituteVariables( argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// TODO : If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder (driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
runDriver(builder, driverDir, driverDesc.supervise)
}
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
*/
private def createWorkingDirectory (): File = {
val driverDir = new File(workDir , driverId )
if (! driverDir.exists () && !driverDir .mkdirs ()) {
throw new IOException("Failed to create directory " + driverDir )
}
driverDir
}
/**
* Download the user jar into the supplied directory and return its local path.
* Will throw an exception if there are errors downloading the jar.
*/
private def downloadUserJar (driverDir : File): String = {
val jarFileName = new URI(driverDesc.jarUrl).getPath. split("/" ).last
val localJarFile = new File(driverDir , jarFileName )
if (! localJarFile.exists ()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $ {driverDesc.jarUrl} to $localJarFile ")
Utils.fetchFile (
driverDesc.jarUrl,
driverDir,
conf,
securityManager,
SparkHadoopUtil.get .newConfiguration (conf),
System.currentTimeMillis(),
useCache = false )
if (! localJarFile.exists ()) { // Verify copy succeeded
throw new IOException(
s "Can not find expected jar $jarFileName which should have been loaded in $driverDir")
}
}
localJarFile. getAbsolutePath
}
private def runDriver (builder : ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
builder.directory(baseDir)
def initialize(process: Process): Unit = {
// Redirect stdout and stderr to files
val stdout = new File( baseDir, "stdout")
CommandUtils.redirectStream (process .getInputStream , stdout )
val stderr = new File( baseDir, "stderr")
val formattedCommand = builder.command .asScala .mkString (" \"", "\" \"" , " \"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40 )
Files.append (header , stderr , StandardCharsets.UTF_8)
CommandUtils.redirectStream (process .getErrorStream , stderr )
}
runCommandWithRetry( ProcessBuilderLike(builder ), initialize , supervise )
//底层的核心:exitCode = process .get. waitFor(),启动Driver进程并等待
}
//Worker.DriverChangedState
private[worker] def handleDriverStateChanged (driverStateChanged : DriverStateChanged): Unit = {
val driverId = driverStateChanged. driverId
val exception = driverStateChanged.exception
val state = driverStateChanged. state
state match {
case DriverState.ERROR =>
logWarning(s"Driver $ driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $ driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $ driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $ driverId was killed by user")
case _ =>
logDebug(s"Driver $ driverId changed state to $state ")
}
// Driver执行完后,DriverRunner通过线程发送状态给Worker,然后Worker会将此消息转发给Master,Master会进行状态改变处理
sendToMaster( driverStateChanged)
// 将Driver从本地缓存移除
valdriver = drivers.remove(driverId).get
// 将Driver加入完成队列
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
// 释放资源
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver. driverDesc.cores
}
(2)在Worker上启动Executor相关
case LaunchExecutor(masterUrl , appId , execId , appDesc , cores_ , memory_ ) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format( appId, execId, appDesc.name ))
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir .mkdirs ()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories .getOrElse (appId , {
val localRootDirs = Utils. getOrCreateLocalRootDirs(conf)
val dirs = localRootDirs .flatMap { dir =>
try {
val appDir = Utils.createDirectory(dir, namePrefix = "executor" )
Utils.chmod700(appDir)
Some(appDir.getAbsolutePath())
} catch {
case e: IOException =>
logWarning(s "${e.getMessage} . Ignoring this directory.")
None
}
}. toSeq
if (dirs .isEmpty ) {
throw new IOException("No subfolder can be created in " +
s "${ localRootDirs.mkString ("," )}." )
}
dirs
})
appDirectories(appId ) = appLocalDirs
val manager = new ExecutorRunner (
appId,
execId,
appDesc. copy( command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState. RUNNING)
//把ExecutorRunner加入本地缓存
executors(appId + "/" + execId) = manager
//启动ExecutorRunner
manager.start()
coresUsed += cores_
memoryUsed += memory_
//向Master发送ExecutorStateChanged消息
sendToMaster(ExecutorStateChanged( appId, execId, manager. state, None, None))
} catch {
case e : Exception =>
logError(s"Failed to launch executor $appId /$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill ()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged( appId, execId, ExecutorState.FAILED,
Some( e. toString), None))
}
private[worker] def start () {
workerThread = new Thread("ExecutorRunner for " + fullId ) {
override def run () { fetchAndRunExecutor () }
}
workerThread. start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some( "Worker shutting down")) }
}
/**
* Download and run the executor described in our ApplicationDescription
*/
private def fetchAndRunExecutor () {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder (appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command. asScala. mkString( "\"" , " \" \" ", "\"" )
logInfo(s"Launch command: $ formattedCommand")
builder.directory(executorDir)
builder.environment. put( "SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment. put( "SPARK_LAUNCH_WITH_SCALA", "0" )
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy" , false)) {
s "/proxy/$workerId/logPage/?appId=$ appId&executorId=$execId &logType="
} else {
s"http://$publicAddress:$ webUiPort/logPage/?appId=$appId &executorId=$execId&logType="
}
builder.environment. put( "SPARK_LOG_URL_STDERR", s"$ {baseUrl}stderr")
builder.environment. put( "SPARK_LOG_URL_STDOUT", s"$ {baseUrl}stdout")
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40 )
// Redirect its stdout and stderr to files
val stdout = new File( executorDir, "stdout" )
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File( executorDir, "stderr" )
Files.write (header , stderr , StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// 发送ExecutorStateChanged消息给Worker
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None )
case e: Exception =>
logError("Error running executor" , e )
state = ExecutorState.FAILED
killProcess(Some (e .toString ))
}
}
private[worker] def handleExecutorStateChanged (executorStateChanged : ExecutorStateChanged):
Unit = {
// 直接向Master发送ExecutorStateChanged消息
sendToMaster( executorStateChanged)
val state = executorStateChanged. state
if (ExecutorState.isFinished (state )) {
val appId = executorStateChanged. appId
val fullId = appId + "/" + executorStateChanged.execId
val message = executorStateChanged. message
val exitStatus = executorStateChanged.exitStatus
executors.get(fullId) match {
case Some (executor ) =>
logInfo("Executor " + fullId + " finished with state " + state +
message. map( " message " + _).getOrElse ("" ) +
exitStatus.map (" exitStatus " + _).getOrElse ("" ))
//将Executor从内存缓存中移除
executors -= fullId
finishedExecutors(fullId ) = executor
trimFinishedExecutorsIfNecessary()
coresUsed -= executor .cores
memoryUsed -= executor .memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message. map( " message " + _).getOrElse ("" ) +
exitStatus.map (" exitStatus " + _).getOrElse ("" ))
}
maybeCleanupApplication( appId)
}
}
标签:val,driver,worker,源码,内核,executor,cores,Spark,Worker 来源: https://blog.csdn.net/weixin_38240095/article/details/96271629