其他分享
首页 > 其他分享> > 6-spark_streaming

6-spark_streaming

作者:互联网

学习目标

1、sparkStreaming概述

1.1 SparkStreaming是什么

之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。

实时计算框架对比

Storm

Spark

对比:

1.2 SparkStreaming的组件

2、Spark Streaming编码实践

Spark Streaming编码步骤:

利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
JAVA_HOME = '/usr/local/java/jdk1.8.0_131'
PYSPARK_PYTHON = "/usr/local/python3/python"
SPARK_HOME = "/bigdata/spark-2.1.2-bin-hadoop2.3"
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
# os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext("local[2]", appName="NetworkWordCount")
    # 参数2:指定执行计算的时间间隔
    ssc = StreamingContext(sc, 1)
    # 监听ip,端口上的上的数据
    lines = ssc.socketTextStream('localhost', 9999)
    # 将数据按空格进行拆分为多个单词
    words = lines.flatMap(lambda line: line.split(" "))
    # 将单词转换为(单词,1)的形式
    pairs = words.map(lambda word: (word, 1))
    # 统计单词个数
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    # 打印结果信息,会使得前面的transformation操作执行
    wordCounts.pprint()
    # 启动StreamingContext
    ssc.start()
    # 等待计算结束
    ssc.awaitTermination()

3、Spark Streaming的状态操作

在Spark Streaming中存在两种状态操作

使用有状态的transformation,需要开启Checkpoint

3.1 updateStateByKey

Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

无状态:指的是每个时间片段的数据之间是没有关联的。

需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作

一般超过一天都是用RDD或Spark SQL来进行离线批处理

如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算

Spark Streaming中提供这种状态保护机制,即updateStateByKey

步骤:

举例:词统计。

案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
​
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")
​
#定义state更新函数
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)
​
lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
    
counts.pprint()
​
ssc.start()
ssc.awaitTermination()

 

3.2 Windows

每隔G秒,统计最近L秒的数据

 

 

操作细节

相关函数

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

 

例如在热词时,在上一个窗口中可能是热词,这个一个窗口中可能不是热词,就需要在这个窗口中把该次剔除掉

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

 

 

案例

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
def get_countryname(line):
    country_name = line.strip()
​
    if country_name == 'usa':
        output = 'USA'
    elif country_name == 'ind':
        output = 'India'
    elif country_name == 'aus':
        output = 'Australia'
    else:
        output = 'Unknown'
​
    return (output, 1)
​
if __name__ == "__main__":
    #定义处理的时间间隔
    batch_interval = 1 # base time unit (in seconds)
    #定义窗口长度
    window_length = 6 * batch_interval
    #定义滑动时间间隔
    frequency = 3 * batch_interval
​
    #获取StreamingContext
    spark = SparkSession.builder.master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc, batch_interval)
    
    #需要设置检查点
    ssc.checkpoint("checkpoint")
​
    lines = ssc.socketTextStream('localhost', 9999)
    addFunc = lambda x, y: x + y
    invAddFunc = lambda x, y: x - y
    #调用reduceByKeyAndWindow,来进行窗口函数的调用
    window_counts = lines.map(get_countryname) \
        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
    #输出处理结果信息
    window_counts.pprint()
​
    ssc.start()
    ssc.awaitTermination()

 



 

标签:PYSPARK,PYTHON,streaming,Streaming,HOME,spark,os,Spark
来源: https://www.cnblogs.com/Live-up-to-your-youth/p/15770494.html