其他分享
首页 > 其他分享> > Spark第三篇:pyspark下的key-value函数

Spark第三篇:pyspark下的key-value函数

作者:互联网

partitionBy

目的:对源数据进行重新分区,

def partitionBy(self, numPartitions, partitionFunc=portable_hash):

其中只需要指定numPartitions就可以了

reduceByKey

目的:可以将数据按照相同的Key对Value进行聚合
示例代码

from pyspark import SparkConf, SparkContext

# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#聚合函数,进行两两聚合
redrdd = rdd.reduceByKey(lambda x,y:x+y)

resultColl = redrdd.collect()
for line in resultColl:
    print(line)

结果

('b', 4)
('a', 9)

groupByKey

目的:将数据源的数据根据key对value进行分组
示例代码

from pyspark import SparkConf, SparkContext

# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#分组函数,keu相同的进行两两分组
grprdd = rdd.groupByKey()

resultColl = grprdd.collect()
for line in resultColl:
    print(line)

结果

('b', <pyspark.resultiterable.ResultIterable object at 0x00000269CE39EAC8>)
('a', <pyspark.resultiterable.ResultIterable object at 0x00000269CE39EA88>)

分析
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能的角度:reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

aggregateByKey

目的:将数据根据不同的规则进行分区内计算和分区间计算,其中还需要指定初始参数zeroValue,

    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
                       partitionFunc=portable_hash):

对比:既可以指定分区内的计算规则,也可以指定分区间的计算规则,groupByKey中,分区内的分组操作,分区间也是分组操作,可以说是一种特例。

示例代码:分区内进行分类,分区间进行聚合


# groupBYkey和rreducebykey分区内的逻辑和分区间逻辑一定是一样的,但是如果分区内要进行最大值比较,分区间要进行相加聚合,那就不行
# agg可以完成分区内和分区间计算逻辑的不同的效果

from pyspark import SparkConf, SparkContext
import math

# 创建local表示只用单线程,loacal[*]表示用电脑全部的cpu核
conf = SparkConf().setMaster("local[*]").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
# 输入的数据
data = [("a",3),("a",5),("a",1),("b",4)]
rdd = sc.parallelize(data)
#agg分组聚合函数
#参数1:初始值,因为最初的时候只读进来一个数字,如果没有初始值,那就不能进行最大值最小值的比较
#参数2:分区内计算逻辑的函数
#参数3:分区间计算逻辑的函数
aggrdd = rdd.aggregateByKey(0,lambda x,y:max(x,y),lambda x,y:x+y)

resultColl = aggrdd.collect()
for line in resultColl:
    print(line)
    

结果

('b', 4)
('a', 9)

foldByKey

目的:指定规则对数据源的分区内和分区间进行操作,但是分区内和分区间必须一样
函数说明:用法和aggregateByKey类似

    def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):

sortByKey

目的:一句键值对数据进行排序

 def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):

标签:聚合,key,pyspark,区内,value,reduceByKey,分组,conf,local
来源: https://blog.csdn.net/weixin_41885239/article/details/116709875