键值对类型转换算子
作者:互联网
1. groupByKey
- 定义:groupByKey([numPartitions])、
- 解释:只对键值对类型RDD生效,同时返回的是一个新的RDD[(key,Iterator[Value])]
- 案例:
def groupByKeyOper(sc: SparkContext): Unit = {
println("----------------groupByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val groupByKey: RDD[(String, Iterable[Int])] = map.groupByKey()
val ptln: String = groupByKey.collect().mkString("=")
println(ptln) // (spark,CompactBuffer(1))=(hive,CompactBuffer(1, 1, 1))=(hadoop,CompactBuffer(1, 1))=(Azkaban,CompactBuffer(1))=(Math,CompactBuffer(1))=(Chinese,CompactBuffer(1))=(English,CompactBuffer(1))=(mapreduce,CompactBuffer(1, 1, 1))=(flink,CompactBuffer(1, 1))=(kafka,CompactBuffer(1, 1))=(hbase,CompactBuffer(1, 1))=(Hadoop,CompactBuffer(1))
println("----------------groupByKey结束------------------")
}
2. reduceByKey
- 定义:reduceByKey(func, [numPartitions])
- 解释:对相同的key值的value数据通过func函数进行聚合操作(总和、最大值、最小值...)返回的是一个RDD[(Key,Value的类型---value代表的是相同key值聚合之后的结果)]
- 案例:
def reduceByKeyOper(sc: SparkContext) = {
println("----------------reduceByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val reduceByKey: RDD[(String, Int)] = map.reduceByKey((_ + _))
println(reduceByKey.collect().mkString("=")) // (spark,1)=(hive,3)=(hadoop,2)=(Math,1)=(Azkaban,1)=(Chinese,1)=(English,1)=(mapreduce,3)=(flink,2)=(kafka,2)=(hbase,2)=(Hadoop,1)
println("----------------reduceByKey结束------------------")
}
3. aggregateByKey
- 定义:aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
- 解释:
(1)零值:初始值
(2)seqOp函数:对每一个分区中key值相同的value数据和零值聚合操作
(3)combOp函数:对不同分区key值相同已经通过seqOp函数计算出来的聚合结果 在和零值聚合一次
当前这个算子是一个转换算子,返回一个新的键值对类型的RDD 其中RDD中key还是原来的key value是aggregateByKey聚合之后的结果 - 案例:
def aggregateByKeyOper(sc: SparkContext) = {
println("----------------reduceByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val agg: RDD[(String, Int)] = map.aggregateByKey(0)(
(a: Int, b: Int) => {
a + b
},
(a: Int, b: Int) => {
a + b
}
)
agg.foreach(println(_)) // (spark,1) (hive,3) (hadoop,2) (Math,1) (Azkaban,1) (Chinese,1) (English,1) (mapreduce,3) (flink,2) (kafka,2) (hbase,2) (Hadoop,1)
println("----------------reduceByKey结束------------------")
}
标签:类型转换,flatMap,CompactBuffer,String,val,RDD,键值,算子,println 来源: https://www.cnblogs.com/jsqup/p/16618549.html