spark源码(七)Worker receive 方法
作者:互联网
receive 方法其实是大量的case,分别对应处理不同的场景
case msg: RegisterWorkerResponse
case SendHeartbeat
case WorkDirCleanup
case MasterChanged
case ReconnectWorker
case LaunchExecutor
case executorStateChanged: ExecutorStateChanged
case KillExecutor(masterUrl, appId, execId)
case LaunchDriver(driverId, driverDesc, resources_)
case KillDriver(driverId)
case driverStateChanged @ DriverStateChanged(driverId, state, exception)
case ReregisterWithMaster
case ApplicationFinished(id)
case DecommissionWorker
case WorkerSigPWRReceived
一. RegisterWorkerResponse 详解
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) =>
val preferredMasterAddress = if (preferConfiguredMasterAddress) {
masterAddress.toSparkURL
} else {
masterRef.address.toSparkURL
}
if (duplicate) {
logWarning(s"Duplicate registration at master $preferredMasterAddress")
}
logInfo(s"Successfully registered with master $preferredMasterAddress")
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)/*更新master信息*/
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)/*启动一个定时任务 开始心跳*/
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
//给自己发送一个清理目录的消息??这是干嘛的
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
//给master发送一个当前状态的信息
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby => // Ignore. Master not yet ready.
}
}
二. SendHeartbeat 详解
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
private def sendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)//给master发送一个心跳信息
case None =>
logWarning(
s"Dropping $message because the connection to master has not yet been established")
}
}
三. WorkDirCleanup 详解
//所有的executors + drivers 目录
val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
try {
val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir =>
val appIdFromDir = dir.getName
val isAppStillRunning = appIds.contains(appIdFromDir)
//当前是一个目录,并且不运行了,APP_DATA_RETENTION_SECONDS 是过了这个时间就清理目录配置项
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
//是移除shuffle服务中的文件的 并不是是清理所用文件的
//我也记得是任务kill的时候会看到清理目录的日志的
shuffleService.applicationRemoved(dir.getName)
}
}
}(cleanupThreadExecutor)
cleanupFuture.failed.foreach(e =>
logError("App dir cleanup failed: " + e.getMessage, e)
)(cleanupThreadExecutor)
} catch {
case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
logWarning("Failed to cleanup work dir as executor pool was shutdown")
}
四. MasterChanged 详解
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
//worker节点只是 被动接受主节点改变的事实 改变自身的配置即可
//不存在 消息发送
changeMaster(masterRef, masterWebUiUrl, masterRef.address)
val executorResponses = executors.values.map { e =>
WorkerExecutorStateResponse(new ExecutorDescription(
e.appId, e.execId, e.cores, e.state), e.resources)
}
val driverResponses = drivers.keys.map { id =>
WorkerDriverStateResponse(id, drivers(id).resources)}
//把当前任务的每个状态给 master汇报一下就行了
masterRef.send(WorkerSchedulerStateResponse(
workerId, executorResponses.toList, driverResponses.toSeq))
五. ReconnectWorker 详解
registerWithMaster() //这个方法上面有介绍的
标签:case,val,receive,send,源码,masterRef,master,spark,dir 来源: https://www.cnblogs.com/wuxiaolong4/p/16687610.html