编程语言
首页 > 编程语言> > Spark RDD编程(3) Key-Value类型

Spark RDD编程(3) Key-Value类型

作者:互联网

1 partitionBy:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

val conf = new SparkConf().setMaster("local[*]").setAppName("word count")
val sc = new SparkContext(conf)

//----------------------- partitionBy -------------------------
val kvRDD: RDD[(Int, Char)] = sc.makeRDD(Array((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')), 4)
val partitionByRDD: RDD[(Int, Char)] = kvRDD.partitionBy(new HashPartitioner(2))
val partitionByArray: Array[Array[(Int, Char)]] = partitionByRDD.glom().collect()
partitionByArray.foreach(data => println(data.mkString(",")))
//(2,b),(4,d)
//(1,a),(3,c)

 

2 groupByKey:groupByKey也是对每个key进行操作,但只生成一个sequence。

val rdd = sc.makeRDD(List("java", "scala", "java", "spark"))
val mapRDD = rdd.map((_, 1))
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupByKeyRDD.foreach(println)
//(spark,CompactBuffer(1))
//(scala,CompactBuffer(1))
//(java,CompactBuffer(1, 1))

 

3 reduceByKey:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

//分组、所有区预聚合
val rdd2 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2)))
val reduceByKeyRDD: RDD[(String, Int)] = rdd2.reduceByKey((result, v) => result + v)
reduceByKeyRDD.collect().foreach(println)
//(female,6)
//(male,7)

 

reduceByKey和groupByKey的区别

1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。既分组又聚合。

2. groupByKey:按照key进行分组,直接进行shuffle。只分组。

3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

4 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

  在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数描述:

(1)zeroValue:给每一个分区中的每一个key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

案例:分区内相同key找出最大值,分区间相加

val rdd3: RDD[(String, Int)] = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
rdd3.glom().collect().foreach(data => println(data.mkString(",")))
//(a,3),(a,2),(c,4)
//(b,3),(c,6),(c,8)

//区内聚合,区间聚合
val aggregateByKeyRDD: RDD[(String, Int)] = rdd3.aggregateByKey(0)(_.max(_), _ + _)
aggregateByKeyRDD.foreach(println)
//(a,3)
//(c,12)
//(b,3)

 

  

 

标签:Key,val,Int,groupByKey,Value,RDD,value,key
来源: https://www.cnblogs.com/noyouth/p/13023570.html