其他分享
首页 > 其他分享> > spark_core_03

spark_core_03

作者:互联网

package com.atguigu.bigata.spark.core.rdd.builder.operator.action

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @auther :atom
 * @date :2022/2/20 20:40
 * wordCount的九种写法
 */
object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("action")
    val sc = new SparkContext(conf)


    //    wc1(sc)
    //    wc2(sc)
    //    wc3(sc)
    //    wc5(sc)

    //    wc6(sc)
    //    wc7(sc)
    //    wc8(sc)
    //    wc9(sc)

 /*   aggregateByKey(sc)
    combineByKey(sc)*/
    wc4(sc)
    sc.stop()

  }

  //TODO reduceByKey
  def wc1(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .collect
      .foreach(println)
  }

  //TODO GroupBy
  def wc2(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .groupBy(word => word)
      .mapValues(_.size)
      .collect
      .foreach(println)
  }

  //TODO GroupByKey
  def wc3(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey()
      .map(
        line => {
          (line._1, line._2.size)
        }
      ).collect
      .foreach(println)

  }

  //TODO reduce
  def wc4(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    println(RDD.flatMap(_.split(" ")).map(line => (mutable.Map[String, Long]((line, 1)))).reduce {
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getOrElse(word, 0L) + count
            map1.update(word, newcount)
          }
        }
        map1
      }
    })
  }

  def wc5(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .countByValue()
      .foreach(println)

  }

  def wc6(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .countByKey()
      .foreach(println)
  }

  def wc7(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" ")).map((_, 1)).combineByKey(
      line => line,
      (a: Int, b: Int) => a + b,
      (a: Int, b: Int) => a + b
    ).collect
      .foreach(println)
  }

  //TODO aggregateByKey  平均值
  def aggregateByKey(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
    RDD.aggregateByKey((0, 0))(
      (tuple, cnt) => {
        (tuple._1 + cnt, tuple._2 + 1)
      },
      (t1, t2) => {
        (t1._1 + t2._1, t2._2 + t2._2)
      }
    ).mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }.collect.foreach(println)
  }

  //TODO combinByKey
  def combineByKey(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
    RDD.combineByKey(
      v => (v, 1),
      (tuple: (Int, Int), v) => {
        (tuple._1 + v, tuple._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    ).mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }.collect
      .foreach(println)
  }


  def wc8(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .foldByKey(0)(_ + _)
      .collect
      .foreach(println)

  }

  def wc9(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .aggregateByKey(0)(_ + _, _ + _)
      .collect
      .foreach(println)
  }


}

标签:SparkContext,core,val,03,RDD,._,sc,spark,hello
来源: https://www.cnblogs.com/ftwftw/p/sparkcore03.html