其他分享
首页 > 其他分享> > 图解大数据 | 流式数据处理-Spark Streaming

图解大数据 | 流式数据处理-Spark Streaming

作者:互联网

作者:韩信子@ShowMeAI
教程地址http://www.showmeai.tech/tutorials/84
本文地址http://www.showmeai.tech/article-detail/179
声明:版权所有,转载请联系平台与作者并注明出处

1.Spark Streaming解读

1)Spark Streaming简介

Spark Streaming是Spark核心API的一个扩展,可以实现实时数据的可拓展,高吞吐量,容错机制的实时流处理框架。

Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。

(1)流数据特点

(2)DStream概念

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream 是由这些RDD 所组成的序列(因此得名“离散化”)。

(3)DStream形成步骤

定义一个RDD处理逻辑,数据按照时间切片,每次流入的数据都不一样,但是RDD的DAG逻辑是一样的,即按照时间划分成一个个batch,用同一个逻辑处理。

DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

2)Spark Streaming特点

Spark Streaming有下述一些特点:

3)Spark Streaming架构

大家知道Spark的工作机制如下:

而SparkStreaming架构由三个模块组成:

在上图中几个核心的角色和功能分别是:

4)Spark Streaming 作业提交

(1)相关组件

Spark Sreaming的作业提交包含的组件和功能分别为:

(2)具体流程

具体的作业提交流程如下:

要传入的数据会编排成block id(元数据)的形式,再加上RDD的逻辑,就生产了job scheduler,通过job manager形成job queue,以队列形式有序执行。真正的数据是以block形式传入worker,由worker上的executor通过元数据信息Block ID去HDFS上拉取对应的block数据进行执行。

Network Input Tracker传入的并不是真正的数据,而是Block IDs,相当于获取的是元数据,数据是通过worker进行接受的,也就是说Master上不管真正数据的接受情况,Master上只是能够拿到数据block的id,至于这些block做什么操作,是会放到 Job Manager去,按照顺序执行。

5)SparkStreaming工作原理

Discretized Stream 是Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的RDD 来表示。每个RDD 含有一段时间间隔内的数据。

简单来说,SparkStreaming接受实时的数据流,把数据按照指定的时间段切成一片片小的数据块(SparkStreaming将每个小的数据块当作RDD来处理),然后把数据块传给Spark Engine处理,最终得到一批批的结果。

对于Streaming来说,它的单位是DStream,而对于SparkCore,它的单位是RDD。针对Spark开发,就是开发RDD的DAG图,而针对SparkStreaming,就是开发DStream。

DStream 代表连续的一组RDD,每个RDD都包含特定时间间隔的数据。DStream内部的操作,可以直接映射到内部RDD进行,相当于DStream是在RDD上增加一个时间的维度得到的。RDD是DStream最小的一个数据单元。DStream中对数据的操作也是按照RDD为单位来进行的。

简单来理解,SparkStreaming对于流数据的处理速度是秒级别,无法达到Storm的毫秒级别,因此也可以将Streaming看作是微批处理。

2.DStream详解

大家在上文中频繁看到Dstream的核心概念,下面我们对其做一些展开讲解。

整体上看,Spark Streaming 的处理思路:将连续的数据持久化、离散化,然后进行批量处。

对上面这句话进行分析:

1)DStream创建注意事项

Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。

此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用local 或者 local。

2)DStream转换

(1)TransFormation算子与输出

DStream 上的原语与 RDD 的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window 相关的原语。

① TransFormation

② Output

DStream = [rdd1, rdd2, …, rddn]
RDD两类算子:transformation、action
DStream两类算子:transformation、output

(2)无状态转换

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

函数名称 目的 Scala示例 用来操作 DStream[T] 的用户自定义函数的函数签名
map ( ) 对 DStream 中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream ds.map(x => x + 1) f : (T) -> U
flatMap( ) 对 DStream 中的每个元素应用给定函数,返回由各元素输出的迭代器组成的 DStream ds.flatMap(x => x.split (“ ”) ) f : T -> Iterable [U]
filter( ) 返回由给定 DStream 中通过筛选的元素组成的 DStream ds.filter(x => x! = 1 ) f : T -> Boolean
repartition( ) 改变 DStream 的分区数 ds.repartition(10 ) N / A
reduceByKey( ) 将每个批次中键相同的记录归约 ds.reduceByKey((x, y) => x + y) f : T , T -> T
groupByKey( ) 将每个批次中的记录根据键分组 ds.groupByKey( ) N / A

需要注意的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream 在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值对DStream 拥有和RDD 一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream 上使用这些操作,这样就对每个批次分别执行了对应的RDD 操作。

我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也可以使用 StreamingContext.union()来合并多个流。

(3)有状态转换

① UpdateStateByKey (全局统计量)

UpdateStateByKey 原语用于记录历史记录,有时,我们需要在DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。

给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

为使用这个功能,你需要做下面两步:

如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制,这样的话才能把每个key对应的state除了在内存中有,在磁盘上也checkpoint一份。因为要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以避免内存数据的丢失。

主要解决:比如说在双十一统计一天销量和成交金额,这些计算需要全量汇总,对数据进行累加,就需要避免数据在内存中丢失,造成不准确

② Window Operations

Window Operations 有点类似于 Storm 中的 State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming 的允许状态。

基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次(在窗口内的批次)的结果,计算出整个窗口的结果。

简单来说,Streaming的Window Operations是Spark提供的一组窗口操作,通过滑动窗口的技术,对大规模数据的增量更新进行统计分析,即定时进行一段时间内的数据处理。

所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。

窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。

滑动窗口的长度必须是滑动时间间隔的整数倍。因为RDD是DStream上最小的数据单元不可切分。如果不是整数倍,会出现一个RDD被切分的情况,程序会报错。

3)DStream Graph

DStream Graph是一系列transformation操作的抽象,例如:

c = a.join(b), d = c.filter() 时, 它们的 DAG 逻辑关系是a/b → c,c → d,但在 Spark Streaming 在进行物理记录时却是反向的 a/b ← c, c ← d, 目的是为了追溯。

(1)DStreamGraph

Dstream之间的转换所形成的的依赖关系全部保存在DStreamGraph中, DStreamGraph对于后期生成RDD Graph至关重要。
DStreamGraph有点像简洁版的DAG scheduler,负责根据某个时间间隔生成一序列 JobSet,以及按照依赖关系序列化

代码是一直在跑的,每隔一定时间就会形成一个RDD。

(2)DStream与RDD对比与理解

DStream.map(RDD => RDD.map)

随看时间的流逝,基于 Dstream Graph不断的生成 RDD Graph也就是DAG的方式产生Job,并通过 Jobscheduler 的线程池提交给 SparkCluster不断的执行。

每个时间间隔会积累一定的数据,这些数据可以看成由 event 组成(假设以 kafka 或者Flume为例),时间间隔是固定的,在时间间隔内的数据就是固定的。也就是RDD是由一个时间间隔内所有数据构成。时间维度的不同,导致每次处理的数据量及内容不同。

3.Spark Streaming应用代码示例

我们先来看一看一个简单的 Spark Streaming 程序的样子。假如我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数,我们可以按照如下步骤进行:

① 首先, 我们导入StreamingContext,这是所有流功能的主要入口点。 我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 创建一个具有两个工作线程(working thread)并且批次间隔为 1 秒的本地 StreamingContext .
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

② 使用该 context,我们创建一个代表从 TCP 源流数据的DStream,指定主机名(例如 localhost)和端口(例如 9999)。

# 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999 
lines = ssc.socketTextStream("localhost", 9999)

③ 上述lines DStream 表示将要从数据服务器接收到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们希望通过空格字符拆分这些数据,把每一行切分为单词。

# 将每一行拆分成单词
words = lines.flatMap(lambda line: line.split(" "))

④ flatMap 是一种一对多的DStream操作,它会通过在源DStream中根据每个记录生成多个新纪录的形式创建一个新的DStream。在这种情况下,每一行都将被拆分成多个单词和代表单词DStream的单词流。下一步,我们想要计算这些单词:

# 计算每一个 batch(批次)中的每一个 word(单词)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# 在控制台打印出在这个DStream中生成的每个 RDD 的前十个元素
wordCounts.print()

上述单词DStream进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的DStream,这个 DStream 然后被reduce来获得数据中每个批次的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的统计结果。

⑤ 注意当这些行被执行的时候, Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地处理。为了在所有的转换都已经设置好之后开始处理,我们在最后调用:

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

该部分完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。

如果你已经 下载 并且 构建 Spark, 您可以使用如下方式来运行该示例. 你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。

$ nc -lk 9999

然后,在另一个不同的终端,你可以通过执行如下命令来运行该示例:

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样:

# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world

# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...

4.参考资料

ShowMeAI相关文章推荐

ShowMeAI系列教程推荐

标签:图解,RDD,流式,Streaming,Spark,DStream,数据
来源: https://www.cnblogs.com/showmeai/p/15983070.html