其他分享
首页 > 其他分享> > 键值对类型转换算子

键值对类型转换算子

作者:互联网

1. groupByKey

def groupByKeyOper(sc: SparkContext): Unit = {
  println("----------------groupByKey开始------------------")
  val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
  val flatMap: RDD[String] = rdd.flatMap((line: String) => {
    line.split(" ")
  })
  val map = flatMap.map((_, 1))
  val groupByKey: RDD[(String, Iterable[Int])] = map.groupByKey()
  val ptln: String = groupByKey.collect().mkString("=")
  println(ptln) // (spark,CompactBuffer(1))=(hive,CompactBuffer(1, 1, 1))=(hadoop,CompactBuffer(1, 1))=(Azkaban,CompactBuffer(1))=(Math,CompactBuffer(1))=(Chinese,CompactBuffer(1))=(English,CompactBuffer(1))=(mapreduce,CompactBuffer(1, 1, 1))=(flink,CompactBuffer(1, 1))=(kafka,CompactBuffer(1, 1))=(hbase,CompactBuffer(1, 1))=(Hadoop,CompactBuffer(1))

  println("----------------groupByKey结束------------------")
}

2. reduceByKey

def reduceByKeyOper(sc: SparkContext) = {
  println("----------------reduceByKey开始------------------")
  val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
  val flatMap: RDD[String] = rdd.flatMap((line: String) => {
    line.split(" ")
  })
  val map = flatMap.map((_, 1))
  val reduceByKey: RDD[(String, Int)] = map.reduceByKey((_ + _))
  println(reduceByKey.collect().mkString("=")) // (spark,1)=(hive,3)=(hadoop,2)=(Math,1)=(Azkaban,1)=(Chinese,1)=(English,1)=(mapreduce,3)=(flink,2)=(kafka,2)=(hbase,2)=(Hadoop,1)

  println("----------------reduceByKey结束------------------")
}

3. aggregateByKey

def aggregateByKeyOper(sc: SparkContext) = {
  println("----------------reduceByKey开始------------------")
  val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
  val flatMap: RDD[String] = rdd.flatMap((line: String) => {
    line.split(" ")
  })
  val map = flatMap.map((_, 1))
  val agg: RDD[(String, Int)] = map.aggregateByKey(0)(
    (a: Int, b: Int) => {
      a + b
    },
    (a: Int, b: Int) => {
      a + b
    }
  )
  agg.foreach(println(_)) //  (spark,1) (hive,3) (hadoop,2) (Math,1) (Azkaban,1) (Chinese,1) (English,1) (mapreduce,3) (flink,2) (kafka,2) (hbase,2) (Hadoop,1)  
  println("----------------reduceByKey结束------------------")
}

标签:类型转换,flatMap,CompactBuffer,String,val,RDD,键值,算子,println
来源: https://www.cnblogs.com/jsqup/p/16618549.html