Spark第三篇:pyspark下的key-value函数
作者:互联网
partitionBy
目的:对源数据进行重新分区,
def partitionBy(self, numPartitions, partitionFunc=portable_hash):
其中只需要指定numPartitions就可以了
reduceByKey
目的:可以将数据按照相同的Key对Value进行聚合
示例代码:
from pyspark import SparkConf, SparkContext
# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#聚合函数,进行两两聚合
redrdd = rdd.reduceByKey(lambda x,y:x+y)
resultColl = redrdd.collect()
for line in resultColl:
print(line)
结果:
('b', 4)
('a', 9)
groupByKey
目的:将数据源的数据根据key对value进行分组
示例代码:
from pyspark import SparkConf, SparkContext
# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#分组函数,keu相同的进行两两分组
grprdd = rdd.groupByKey()
resultColl = grprdd.collect()
for line in resultColl:
print(line)
结果:
('b', <pyspark.resultiterable.ResultIterable object at 0x00000269CE39EAC8>)
('a', <pyspark.resultiterable.ResultIterable object at 0x00000269CE39EA88>)
分析:
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能的角度:reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
aggregateByKey
目的:将数据根据不同的规则进行分区内计算和分区间计算,其中还需要指定初始参数zeroValue,
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
partitionFunc=portable_hash):
对比:既可以指定分区内的计算规则,也可以指定分区间的计算规则,groupByKey中,分区内的分组操作,分区间也是分组操作,可以说是一种特例。
示例代码:分区内进行分类,分区间进行聚合
# groupBYkey和rreducebykey分区内的逻辑和分区间逻辑一定是一样的,但是如果分区内要进行最大值比较,分区间要进行相加聚合,那就不行
# agg可以完成分区内和分区间计算逻辑的不同的效果
from pyspark import SparkConf, SparkContext
import math
# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#agg分组聚合函数
#参数1:初始值,因为最初的时候只读进来一个数字,如果没有初始值,那就不能进行最大值最小值的比较
#参数2:分区内计算逻辑的函数
#参数3:分区间计算逻辑的函数
aggrdd = rdd.aggregateByKey(0,lambda x,y:max(x,y),lambda x,y:x+y)
resultColl = aggrdd.collect()
for line in resultColl:
print(line)
结果:
('b', 4)
('a', 9)
foldByKey
目的:指定规则对数据源的分区内和分区间进行操作,但是分区内和分区间必须一样
函数说明:用法和aggregateByKey类似
def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):
sortByKey
目的:一句键值对数据进行排序
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
标签:聚合,key,pyspark,区内,value,reduceByKey,分组,conf,local 来源: https://blog.csdn.net/weixin_41885239/article/details/116709875