其他分享
首页 > 其他分享> > RDD转换算子--key-value

RDD转换算子--key-value

作者:互联网

Key - Value类型

1)     partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

val rdd: RDD[(Int, String)] =

    sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

import org.apache.spark.HashPartitioner

val rdd2: RDD[(Int, String)] =

    rdd.partitionBy(new HashPartitioner(2))

2)     reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]   //添加聚合方式

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]  //聚合方式及分区数

可以将数据按照相同的Key对Value进行聚合

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 = dataRDD1.reduceByKey(_+_)

val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

3)     groupByKey

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

将分区的数据直接转换为相同类型的内存数组进行后续处理

val dataRDD1 =

    sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 = dataRDD1.groupByKey()  //默认原来分区数

val dataRDD3 = dataRDD1.groupByKey(2)  //重新定义分区

val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))  //hash值分区

思考一个问题:reduceByKey和groupByKey的区别?

两个算子在实现相同的业务功能时,reduceByKey存在预聚和功能,所以性能比较高,推荐使用。但是,不是说一定就采用这个方法,需要根据场景来选择

4)     aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,  

//第一个参数value初始值  

//第二个参数中包括(分区内计算规则,分区间计算规则)

  combOp: (U, U) => U): RDD[(K, U)]

将数据根据不同的规则进行分区内计算和分区间计算

val dataRDD1 =

    sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 =

    dataRDD1.aggregateByKey(0)(_+_,_+_)   //分区内  分区间计算规则相同

5)     foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]  

//第一个参数value初始值

//第二个参数分区内计算规则和分区间计算规则相同

当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

6)     combineByKey

def combineByKey[C](

  createCombiner: V => C,   //转换第一个数据的数据结果

  mergeValue: (C, V) => C,   //分区内计算规则

  mergeCombiners: (C, C) => C): RDD[(K, C)]  //分区计算规则

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

小练习:将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))

val input: RDD[(String, Int)] = sc.makeRDD(list, 2)

 

val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(

    (_, 1),

    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),

    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

)

思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

从源码的角度来讲,四个算子的底层逻辑是相同的。

ReduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同。

AggregateByKey的算子会将初始值和第一个value使用分区内计算规则进行计算。

FoldByKey的算子的分区内和分区间的计算规则相同,初始值和第一个value使用分区内计算规则。

CombineByKey的第一个参数就是对第一个value进行处理,所有无需初始值

7)     sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

  : RDD[(K, V)]  //true为升序  false为降序

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)

val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

v  小功能:设置key为自定义类User

class User extends Ordered[User]{

    override def compare(that: User): Int = {

        1

    }

}

8)     join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))

val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))

rdd.join(rdd1).collect().foreach(println)

思考一个问题:如果key存在不相等呢?

如果key不相等,对应的数据无法连接,如果key有重复的,那么数据会多次连接

9)     leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

类似于SQL语句的左外连接

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))

 

val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

 

10)   cogroup

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))

val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))

 

val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =

 

dataRDD1.cogroup(dataRDD2)

标签:val,makeRDD,Int,value,RDD,key,dataRDD1,def
来源: https://www.cnblogs.com/fyb-392/p/13060800.html