Spark RDD编程(3) Key-Value类型
作者:互联网
1 partitionBy:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
val conf = new SparkConf().setMaster("local[*]").setAppName("word count") val sc = new SparkContext(conf) //----------------------- partitionBy ------------------------- val kvRDD: RDD[(Int, Char)] = sc.makeRDD(Array((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')), 4) val partitionByRDD: RDD[(Int, Char)] = kvRDD.partitionBy(new HashPartitioner(2)) val partitionByArray: Array[Array[(Int, Char)]] = partitionByRDD.glom().collect() partitionByArray.foreach(data => println(data.mkString(","))) //(2,b),(4,d) //(1,a),(3,c)
2 groupByKey:groupByKey也是对每个key进行操作,但只生成一个sequence。
val rdd = sc.makeRDD(List("java", "scala", "java", "spark")) val mapRDD = rdd.map((_, 1)) val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey() groupByKeyRDD.foreach(println) //(spark,CompactBuffer(1)) //(scala,CompactBuffer(1)) //(java,CompactBuffer(1, 1))
3 reduceByKey:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
//分组、所有区预聚合 val rdd2 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2))) val reduceByKeyRDD: RDD[(String, Int)] = rdd2.reduceByKey((result, v) => result + v) reduceByKeyRDD.collect().foreach(println) //(female,6) //(male,7)
reduceByKey和groupByKey的区别
1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。既分组又聚合。
2. groupByKey:按照key进行分组,直接进行shuffle。只分组。
3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
4 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
参数描述:
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。
案例:分区内相同key找出最大值,分区间相加
val rdd3: RDD[(String, Int)] = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2) rdd3.glom().collect().foreach(data => println(data.mkString(","))) //(a,3),(a,2),(c,4) //(b,3),(c,6),(c,8) //区内聚合,区间聚合 val aggregateByKeyRDD: RDD[(String, Int)] = rdd3.aggregateByKey(0)(_.max(_), _ + _) aggregateByKeyRDD.foreach(println) //(a,3) //(c,12) //(b,3)
标签:Key,val,Int,groupByKey,Value,RDD,value,key 来源: https://www.cnblogs.com/noyouth/p/13023570.html