RDD转换算子--key-value
作者:互联网
l Key - Value类型
1) partitionBy
- 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
- 函数说明
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
val rdd: RDD[(Int, String)] =
sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
import org.apache.spark.HashPartitioner
val rdd2: RDD[(Int, String)] =
rdd.partitionBy(new HashPartitioner(2))
2) reduceByKey
- 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)] //添加聚合方式
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] //聚合方式及分区数
- 函数说明
可以将数据按照相同的Key对Value进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
3) groupByKey
- 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
- 函数说明
将分区的数据直接转换为相同类型的内存数组进行后续处理
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey() //默认原来分区数
val dataRDD3 = dataRDD1.groupByKey(2) //重新定义分区
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2)) //hash值分区
思考一个问题:reduceByKey和groupByKey的区别?
两个算子在实现相同的业务功能时,reduceByKey存在预聚和功能,所以性能比较高,推荐使用。但是,不是说一定就采用这个方法,需要根据场景来选择
4) aggregateByKey
- 函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
//第一个参数value初始值
//第二个参数中包括(分区内计算规则,分区间计算规则)
combOp: (U, U) => U): RDD[(K, U)]
- 函数说明
将数据根据不同的规则进行分区内计算和分区间计算
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
dataRDD1.aggregateByKey(0)(_+_,_+_) //分区内 分区间计算规则相同
5) foldByKey
- 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
//第一个参数value初始值
//第二个参数分区内计算规则和分区间计算规则相同
- 函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
6) combineByKey
- 函数签名
def combineByKey[C](
createCombiner: V => C, //转换第一个数据的数据结果
mergeValue: (C, V) => C, //分区内计算规则
mergeCombiners: (C, C) => C): RDD[(K, C)] //分区计算规则
- 函数说明
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
小练习:将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
从源码的角度来讲,四个算子的底层逻辑是相同的。
ReduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同。
AggregateByKey的算子会将初始值和第一个value使用分区内计算规则进行计算。
FoldByKey的算子的分区内和分区间的计算规则相同,初始值和第一个value使用分区内计算规则。
CombineByKey的第一个参数就是对第一个value进行处理,所有无需初始值
7) sortByKey
- 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] //true为升序 false为降序
- 函数说明
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
v 小功能:设置key为自定义类User
class User extends Ordered[User]{
override def compare(that: User): Int = {
1
}
}
8) join
- 函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
- 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
思考一个问题:如果key存在不相等呢?
如果key不相等,对应的数据无法连接,如果key有重复的,那么数据会多次连接
9) leftOuterJoin
- 函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
- 函数说明
类似于SQL语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
10) cogroup
- 函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
- 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
dataRDD1.cogroup(dataRDD2)
标签:val,makeRDD,Int,value,RDD,key,dataRDD1,def 来源: https://www.cnblogs.com/fyb-392/p/13060800.html