编程语言
首页 > 编程语言> > 第五章_Spark核心编程_Rdd_转换算子_keyValue型_aggregateByKey

第五章_Spark核心编程_Rdd_转换算子_keyValue型_aggregateByKey

作者:互联网

1. 定义

  /*
  * 1. 定义
  *     def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *     def aggregateByKey[U: ClassTag](zeroValue: U)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *     def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *
  * 2.功能
  *    将数据根据不同的规则进行 分区内计算 和 分区间计算
  *    操作流程
  *       1. 分区内 对相同的key 分组
  *          示例 : key iter(value1,value2,value3)
  *
  *       2. 根据出入的规则 seqOp: (U, V) => U 对分区内相同的key 做聚合操作
  *          示例 :  seqOp(zeroValue,value1)...
  *
  *       3. 聚合后输出每个分区的结果 key,value
  *
  *       4. 拉取每个分区 的key,value ,并对相同key 的value做reduce操作(存在Shuffle过程)
  *          示例 :  combOp(zeroValue,value1)...
  *
  *       5. 对 所有分区的key 做完reduce操作后,按照指定的 partitioner 重新对结果分区
  *          不指定分区器时,用默认分区器 HashPartitoner
  *          不指定分区个数时,用父Rdd分区个数
  *
  * 3.note
  *    zeroValue 会参与 分区内和分区间的 reduce操作
  *
  * */

2. 示例

  object aggregateTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2)

    private val rdd2 = rdd.aggregateByKey("")(
      //分区内计算规则 (从左往右计算)
      //对分区内 相同key 的value 做reduce操作
      (zeroValue: String, value1: String) => {
        println(s"key:${zeroValue} value:${value1}")
        zeroValue + "-" + value1
      }

      //拉取各个分区 key-value,对相同key 的value 做reduce操作
      , (zeroValue, par_value) => zeroValue + par_value
    )

    println(s"${rdd2.collect().mkString(",")}")

    sc.stop()
  }

 

标签:zeroValue,value1,分区,reduce,value,Rdd,keyValue,key,aggregateByKey
来源: https://www.cnblogs.com/bajiaotai/p/16054066.html