spark教程(四)-python基础编程
作者:互联网
hadoop 是 java 开发的,原生支持 java;spark 是 scala 开发的,原生支持 scala;
spark 还支持 java、python、R,本文只介绍 python
spark 1.x 和 spark 2.x 用法略有不同,spark 1.x 的用法大部分也适用于 spark 2.x
Pyspark
python + spark,简单来说,想用 python 操作 spark,就必须用 pyspark 模块
RDD
spark 最重要的一个概念叫 RDD,Resilient Distributed Dataset,弹性分布式数据集
RDD 可以从 hadoop 获取数据,也可以从其他地方获取数据,也可以从一种 RDD 转换成 另一种 RDD;
Python 编程基本语法
1. 首先创建 SparkSession
在 spark1.x 中是创建 SparkContext
在 spark2.x 中创建 SparkSession
2. 然后创建 RDD
spark 是以 RDD 概念为中心运行的,RDD 是一个容错的、可以被并行操作的元素集合。
创建 RDD 有两种方式:
1. 在驱动程序中并行化一个已经存在的集合 【内存中的数据】
2. 从外部存储系统引入数据,生成 RDD 【外部存储介质中的数据,注意 spark 本身没有存储功能】
// 这个存储系统可以是一个共享文件系统,如 hdfs、hbase
详见我的博客 RDD 认知
3. 操作 RDD
RDD 的操作有两种方式:转换 和 行动,而且 转换 是 惰性的
可以根据 是否有返回 判断是哪个操作,行动 有返回值,转换无返回值
详见官网 RDD
3.1 RDD 缓存
我们可以把 RDD 缓存到 内存中, 这其实就是 行动 操作
distFile = sc.textFile('README.md') m = distFile.map(lambda x: len(x)) # map 是 转换 操作,并不立即执行 m.cache() # 把 map 的输出缓存到内存中,其实 cache 就是 执行 操作
或者 m.persist()
3.2 转换 操作
惰性,无返回值
map(func[, preservesPartitioning=False]):把一个序列中的元素逐个送入 map,经 func 处理后,返回一个新的 序列
rdd = sc.parallelize([2, 3, 4]) rdd.map(lambda x: x + 1).collect() # [3, 4, 5]
filter(func):类似 map,func 是个过滤函数
rdd = sc.parallelize([2, 3, 4]) rdd.map(lambda x: x > 3).collect() # [False, False, True]
flatMap(func[, preservesPartitioning=False]):也类似 map,只是 它会把 每次经过 func 处理的结果进行 合并,输入和输出的 list 长度可能不同
rdd = sc.parallelize([2, 3, 4]) rdd.flatMap(lambda x: range(1, x)).collect() # [1, 1, 2, 1, 2, 3] # range(1, 2): 1 # range(1, 3): 1, 2 # range(1, 4): 1, 2, 3 ### vs map rdd.map(lambda x: range(1, x)).collect() # [[1], [1, 2], [1, 2, 3]]
mapPartitions(func [, preservesPartitioning=False]) :map的一个变种,map 是把序列的单个元素送入 func ,而 mapPartitions 是把 序列分区后 每个 分区 整体送入 func
rdd = sc.parallelize([1,2,3,4,5], 3) # 分 3 个区 def f(iterator): yield sum(iterator) # 必须是生成器,即 yield,不能 return rdd.mapPartitions(f).collect() # [1, 5, 9]
mapPartitionsWithIndex(func [, preservesPartitioning=False]) :func 有两个参数,分片的序号 和 迭代器,返回 分片序号,也必须是 迭代器
rdd = sc.parallelize(range(15), 13) # 分 13 个区 def f(splitIndex, iterator): yield splitIndex rdd.mapPartitionsWithIndex(f).collect() # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cartesian(otherDataset):利用两个 序列 生成 笛卡尔內积 的数据集
x = sc.parallelize([1,2,3]) y = sc.parallelize([4,5]) x.cartesian(y).collect() # [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
以下方法只适用 key-value 数据
mapValues(func):根据 func 处理 value
rdd = sc.parallelize([(1, [1,2,3]), (3, ['a', 'b'])]) rdd.mapValues(len).collect() # [(1, 3), (3, 2)] 计算 value 的长度
reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]):针对 k-v 对的处理方法,把 key 相同的 value 进行 reduce,然后重新组成 key-reduce 对
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) def f(x, y): return x + y rdd.reduceByKey(f).collect() # [('a', 2), ('b', 1)]
sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]):根据 key 进行排序,默认升序,numPartitions 代表分区数,keyfunc 是处理 key 的,在 排序过程中对 key 进行处理
tmp = [('a', 4), ('b', 3), ('c', 2), ('D', 1)] sc.parallelize(tmp).sortByKey(True, 1).collect() # 升序[('D', 1), ('a', 4), ('b', 3), ('c', 2)] 1代表分区数 sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect() # 升序[('a', 4), ('b', 3), ('c', 2), ('D', 1)] D跑到后面了 sc.parallelize(tmp).sortByKey(False, 2, keyfunc=lambda k:k.lower()).collect()# 降序[('D', 1), ('c', 2), ('b', 3), ('a', 4)]
keyfunc 只在 排序过程中起作用,在输出时 keyfunc 不起作用
join(otherDataset [, numPartitions=None]):将 两个 k-v RDD 中 共有的 key 的 value 交叉组合
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) x.join(y).collect() # [('a', (1, 2)), ('a', (1, 3))]
3.3 行动 操作
有返回值
collect:返回 RDD 中的数据
count:返回 RDD 中元素个数
first:返回 RDD 中第一个元素
max. min.sum:不解释
take(n):返回 RDD 中 前 n 个元素
takeOrdered(n [, key=None]):对 RDD 先进行排序,然后取排序后的 前 n 个数据,key 表示先经过 keyfunc 处理后再进行排序,最终返回的还是原数据
sc.parallelize([9,7,3,2,6,4]).takeOrdered(3) # [2, 3, 4] sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x: -x) # [9, 7, 6] ## 过程如下 # 9, 7, 3, 2, 6, 4 ## 原数据 # -9, -7, -3, -2, -6, -4 ## 经过 keyfunc 处理后的数据 # -9, -7, -6, -4, -3, -2 ## 对处理后的数据升序排序 # -9, -7, -6 ## 取前3个 # 9, 7, 6 ## 对应到原数据
也就是说,keyfunc 只在排序时起作用,在输出时不起作用
foreach(func):运行 func 函数 并行处理 RDD 的所有元素
sc.parallelize([1, 2, 3, 4, 5]).foreach(print) # 并行打印,不按顺序输出 # 1 # 2 # 4 # 5 # 3
reduce(func):把 RDD 中前两个元素送入 func,得到一个 value,把这个 value 和 下一个元素 送入 func,直至最后一个元素
sc.parallelize([1,2,3,4,5]).reduce(lambda x, y: x + y) # 15 求和
fold:与 reduce 类似,fold 是有一个 基数,然后 把每个元素 和 基数 送入 func,然后替换该基数,循环,直到最后一个元素
x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value, lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) # [1,2,3] print(y) # 6
aggregate:对每个分区进行聚合,然后聚合每个分区的聚合结果,详见我的博客 aggregate
countByValue:统计相同元素的个数
sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items() # [(1, 2), (2, 4), (3, 3), (5, 1)] # 输入 k-v 不按 value 统计,按 k-v 统计 sc.parallelize([('a', 1), ('b', 1)]).countByValue().items() # [(('a', 1), 1), (('b', 1), 1)]
saveAsTextFile(path [, compressionCodecClass=None]):把 RDD 存储到文件系统中
counts.saveAsTextFile('/usr/lib/spark/out')
输入必须是 路径,且该路径不能事先存在
以下方法只适用 key-value 数据
countByKey:统计相同 key 的个数,返回 key-count
sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey() # defaultdict(<type 'int'>, {'a': 2, 'b': 1}) dictdata= sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey() dictdata.items() # [('a', 2), ('b', 1)]
Python 脚本
如何运行 python 脚本?如何 在 python 中 调用 spark?,这两个问题答案相同。
首先需要配置 /etc/profile
# python can call pyspark directly export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
python 的搜索路径 ,加上 spark 中 python 和 pyspark,以及 py4j-0.10.4-src.zip,他的作用是 负责 python 和 java 之间的 转换。
python 脚本 test1.py
from __future__ import print_function from pyspark import * import os print(os.environ['SPARK_HOME']) print(os.environ['HADOOP_HOME']) if __name__ == '__main__': sc = SparkContext("spark://hadoop10:7077") rdd = sc.parallelize("hello Pyspark world".split(' ')) counts = rdd.map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile('/usr/lib/spark/out') counts.foreach(print) sc.stop()
命令行执行
bin/spark-submit test1.py
或者 之间运行 py 文件
python test1.py
脚本模式 通过 http://192.168.10.10:8080/ 查看任务
参考资料:
https://www.cnblogs.com/yangzhang-home/p/6056133.html 快速入门
https://blog.csdn.net/kl28978113/article/details/80361452 较全教程
http://spark.apache.org/docs/latest/ spark 2.4.4 官网
http://spark.apache.org/docs/latest/api/python/index.html spark 2.4.4 python API
https://www.cnblogs.com/Vito2008/p/5216324.html
https://blog.csdn.net/proplume/article/details/79798289
https://www.iteblog.com/archives/1396.html#aggregate RDD 操作 API
https://www.cnblogs.com/yxpblog/p/5269314.html RDD 操作 API
标签:parallelize,python,编程,RDD,func,sc,spark 来源: https://www.cnblogs.com/yanshw/p/11620204.html