其他分享
首页 > 其他分享> > pyspark steaming常规语句及操作

pyspark steaming常规语句及操作

作者:互联网

参考官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html

pyspark steaming 流批处理,类strom、flink、kafak stream;核心抽象是Dstream,一个系列的rdd组成

案例:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os
os.environ["PYSPARK_PYTHON"]="/Users/lonng/opt/anaconda3/python.app/Contents/MacOS/python"

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

#
# from pyspark import SparkContext
# from pyspark.streaming import StreamingContext
#
# sc = SparkContext(master, appName)
# ssc = StreamingContext(sc, 1)

1、命令行先运行 nc -lk 9999
2、运行上面代码,可以直接idea工具里pycharm运行
3、可以web看过程 http://localhost:4040/streaming/

在这里插入图片描述
现在程序就1秒每次批处理进行监听;现在在刚打开的 nc -lk 9999 窗口可以一行一行的传入数据,spark streaming安装刚程序编写的处理逻辑进行处理数据了,然后web里也能看到记录

初心fly 发布了177 篇原创文章 · 获赞 48 · 访问量 8万+ 私信 关注

标签:语句,pyspark,steaming,each,streaming,StreamingContext,import,ssc
来源: https://blog.csdn.net/weixin_42357472/article/details/103935690