Spark优化_代码优化_Map端预聚合算子->combineByKey
作者:互联网
package other
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* @Author yqq
* @Date 2021/12/24 19:37
* @Version 1.0
*/
object CombineByKeyTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("l")
)
context.setLogLevel("Error")
val rdd1: RDD[(String, Int)] = context.parallelize(List[(String, Int)](
("kobe", 24),
("james", 25),
("kobe", 26),
("bob", 27),
("kobe", 28),
("james", 29)
), 2)
/**
partition index = 0,value = (kobe,24) ==>(kobe,hello24#26)
partition index = 0,value = (james,25) ==>(james,hello25)
partition index = 0,value = (kobe,26)
partition index = 1,value = (bob,27) ==>(bob,hello27)
partition index = 1,value = (kobe,28) ==>(kobe,hello28)
partition index = 1,value = (james,29) ==>(james,hello29)
*/
rdd1.combineByKey(i=>{"hello"+i},(s:String,i:Int)=>{s+"#"+i},(s1:String,s2:String)=>{s1+"@"+s2})
.foreach(println)
// rdd1.mapPartitionsWithIndex((index,iter)=>{
// val list = new ListBuffer[String]()
// while(iter.hasNext){
// val tp = iter.next()
// list.append("partition index = "+index+",value = "+tp)
// }
// list.iterator
// }).foreach(println)
}
}
标签:Map,String,index,james,partition,value,代码优化,kobe,combineByKey 来源: https://blog.csdn.net/manba_yqq/article/details/122135401