Spark学习实例(Python):RDD转换 Transformations
作者:互联网
RDD是弹性分布式数据集,一种特殊集合,可以被缓存支持并行操作,一个RDD代表一个分区里的数据集
转换操作有:
- map(func)
- filter(func)
- flatMap(func)
- mapPartitions(func)
- sample(withReplacement, fraction, seed)
- union(otherDataset)
- intersection(otherDataset)
- distinct([numPartitions])
- groupByKey([numPartitions])
- reduceByKey(func, [numPartitions])
- aggregateByKey(zeroValue)(seqOp, combOp, [numPartitons])
- sortByKey([ascending], [numPartitions])
- join(otherDataset, [numPartitions])
- cogroup(otherDataset, [numPartitions])
- cartesian(otherDataset)
- pipe(command, [envVars])
- coalesce(numPartitions)
- repartition(numPartitions)
- repartitionAndSortWithinPartitions(partitioner)
map:对RDD中每个元素都执行一个指定函数从而形成一个新的RDD
from pyspark import SparkContext
def func(x):
return x*2
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# 方式一
mapRdd1 = rdd.map(lambda x: x*2)
print(mapRdd1.collect())
# [2, 4, 6, 8, 10]
# 方式二
mapRdd2 = rdd.map(func)
print(mapRdd2.collect())
# [2, 4, 6, 8, 10]
sc.stop()
map依赖图关系如下,红框代表整个数据集,黑框代表一个RDD分区,里面是每个分区的数据集
filter:过滤元素,保留符合指定条件的元素形成一个新的RDD
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
filterRdd = rdd.filter(lambda x: x%2==0)
print(filterRdd.collect())
# [2, 4]
sc.stop()
filter执行依赖关系图如下
flatMap:与map类似,但是每一个输入元素会被映射成0个或多个元素,最后达到扁平化效果
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [[1,2],[3],[4],[5]]
rdd = sc.parallelize(data)
print(rdd.collect())
# [[1, 2], [3], [4], [5]]
flatMapRdd = rdd.flatMap(lambda x: x)
print(flatMapRdd.collect())
# [1, 2, 3, 4, 5]
sc.stop()
flatMap依赖关系图如下
mapPartitions:是map的一个变种,map对每个元素执行指定函数,mapPartitions对每个分区数据执行指定函数
from pyspark import SparkContext
def func(datas):
list = []
for data in datas:
list.append(data * 2)
return list
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
mapParRdd = rdd.mapPartitions(func)
print(mapParRdd.collect())
# [2, 4, 6, 8, 10]
sc.stop()
mapPartitions依赖关系图如下
sample:
union:将两个RDD进行并集,返回元素并集新的RDD,若两个RDD相同不会去重
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [1, 2, 3]
rdd = sc.parallelize(data)
unionRdd = rdd.union(rdd)
print(unionRdd.collect())
# [1, 2, 3, 1, 2, 3]
sc.stop()
union依赖关系图如下
intersection:将两个RDD元素进行交集,返回一个新的数据集
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([2,3,4])
insRdd = rdd1.intersection(rdd2)
print(insRdd.collect())
# [2, 3]
sc.stop()
intersection依赖关系图如下
distinct:对RDD中元素进行去重,返回一个新的RDD,其中参数代表的是并行度
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [1, 1, 2, 3, 4]
rdd = sc.parallelize(data)
distRdd = rdd.distinct(2)
print(distRdd.collect())
# [2, 4, 1, 3]
sc.stop()
distinct依赖关系图如下
groupByKey:对(K,V)数据分组,相同的K分为同一组,返回一个(K, Seq[V])的数据集,参数可以设置并行度
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [('a', 1), ('b', 2), ('a', 1)]
rdd = sc.parallelize(data)
groupRdd = rdd.groupByKey()
print(groupRdd.mapValues(list).collect())
# [('b', [2]), ('a', [1, 1])]
sc.stop()
groupByKey依赖关系图如下
reduceByKey:对(K,V)数据进行分组聚合,返回一个新的(K,V)数据集,参数可以设置并行度,可以使用reduceByKey时尽量不要使用groupByKey
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="rddTransformation", master="local[*]")
data = [('a', 1), ('b', 2), ('a', 1)]
rdd = sc.parallelize(data)
reduceRdd = rdd.reduceByKey(lambda x,y: x+y)
print(reduceRdd.collect())
# [('b', 2), ('a', 2)]
sc.stop()
reduceByKey依赖关系图如下
标签:__,SparkContext,Transformations,rdd,Python,RDD,sc,data 来源: https://blog.csdn.net/a544258023/article/details/96166156