其他分享
首页 > 其他分享> > spark streaming初始化过程

spark streaming初始化过程

作者:互联网

原文链接:https://www.jianshu.com/p/376a1d093bf8

Spark Streaming是一种构建在Spark上的实时计算框架。Spark Streaming应用以Spark应用的方式提交到Spark平台,其组件以长期批处理任务的形式在Spark平台运行。这些任务主要负责接收实时数据流及定期产生批作业并提交至Spark集群,本文要说明的是以下几个功能模块运行前的准备工作。

数据接收
Job 生成
流量控制
动态资源伸缩

下面我们以WordCount程序为例分析Spark Streaming运行环境的初始化过程。
val conf = new SparkConf().setAppName(“wordCount”).setMaster(“local[4]”)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream(“localhost”, 8585, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(.split(" ")).map(w => (w,1))
val wordCount = words.reduceByKey(
+_)
wordCount.print
ssc.start()
ssc.awaitTermination()

以下流程,皆以上述WordCount源码为例。
1、StreamingContext的初始化过程
StreamingContext是Spark Streaming应用的执行环境,其定义很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。
在创建Streaming应用时,首先应创建StreamingContext(WordCount应用可知),伴随StreamingContext的创建将会创建以下主要组件:
1.1 DStreamGraph
DStreamGraph的主要功能是记录InputDStream及OutputStream及从InputDStream中抽取出ReceiverInputStreams。因为DStream之间的依赖关系类似于RDD,并在任务执行时转换成RDD,因此,可以认为DStream Graph与RDD Graph存在对应关系. 即:DStreamGraph以批处理间隔为周期转换成RDDGraph.

ReceiverInputStreams: 包含用于接收数据的Receiver信息,并在启动Receiver时提供相关信息
OutputStream:每个OutputStream会在批作业生成时,生成一个Job.

1.2 JobScheduler
JobScheduler是Spark Streaming中最核心的组件,其负载Streaming各功作组件的启动。

数据接收
Job 生成
流量控制
动态资源伸缩
以及负责生成的批Job的调度及状态管理工作。

2、 DStream的创建与转换
StreamingContext初始化完毕后,通过调用其提供的创建InputDStream的方法创建SocketInputDStream.
SocketInputDStream的继承关系为:
SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
在InputDStream中 提供如下功
ssc.graph.addInputStream(this)

JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中,以标识其为输入数据源。
DStream中算子的转换,类似于RDD中的转换,都是延迟计算,仅形成pipeline链。当上述应用遇到print(Output算子)时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表,以待生成Job。
print算子实现方法如下:
/**

/**

*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

ForEachDStream 不同于其它DStream的地方为其重写了generateJob方法,以使DStream Graph操作转换成RDD Graph操作,并生成Job.
3、SparkContext启动
/**

在此方法中,最核心的代码是以线程的方式启动JobScheduler,从而开启各功能组件。
3.1 JobScheduler的启动
JobScheduler主要负责以下几种任务:

数据接收相关组件的初始化及启动
ReceiverTracker的初始化及启动。ReceiverTracker负责管理Receiver,包括Receiver的启停,状态维护 等。
Job生成相关组件的启动
JobGenerator的启动。JobGenerator负责以BatchInterval为周期生成Job.
Streaming监听的注册与启动
作业监听
反压机制
BackPressure机制,通过RateController控制数据摄取速率。
Executor DynamicAllocation 的启动
Executor 动态伸缩管理, 动态增加或减少Executor,来达到使用系统稳定运行 或减少资源开销的目的。
Job的调度及状态维护。

JobScheduler的start方法的代码如下所示:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
  override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

  override protected def one rror(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
  inputDStream <- ssc.graph.getInputStreams
  rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)

val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
  case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
  case _ => null
}

executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
  executorAllocClient,
  receiverTracker,
  ssc.conf,
  ssc.graph.batchDuration.milliseconds,
  clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")

}

代码中存在的 eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。
小结
JobScheduler是Spark Streaming中核心的组件,在其开始执行时,会开启数据接收相关组件及Job生成相关组件,从而使数据准备和数据计算两个流程开始工作。
另外,其还负责BackPressure, Executor DynamicAllocation 等优化机制的启动工作。
下面的章节,将对数据准备和数据计算阶段的流程进行分析,以及BackPressure, Executor DynamicAllocation 机制进行分析。

作者:barrenlake
链接:https://www.jianshu.com/p/376a1d093bf8
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

标签:初始化,JobScheduler,Streaming,streaming,Job,StreamingContext,spark,DStream,Spark
来源: https://blog.csdn.net/HB_PRI/article/details/100063196