spark_core_03
作者:互联网
package com.atguigu.bigata.spark.core.rdd.builder.operator.action
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* @auther :atom
* @date :2022/2/20 20:40
* wordCount的九种写法
*/
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("action")
val sc = new SparkContext(conf)
// wc1(sc)
// wc2(sc)
// wc3(sc)
// wc5(sc)
// wc6(sc)
// wc7(sc)
// wc8(sc)
// wc9(sc)
/* aggregateByKey(sc)
combineByKey(sc)*/
wc4(sc)
sc.stop()
}
//TODO reduceByKey
def wc1(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect
.foreach(println)
}
//TODO GroupBy
def wc2(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.groupBy(word => word)
.mapValues(_.size)
.collect
.foreach(println)
}
//TODO GroupByKey
def wc3(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.map((_, 1))
.groupByKey()
.map(
line => {
(line._1, line._2.size)
}
).collect
.foreach(println)
}
//TODO reduce
def wc4(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
println(RDD.flatMap(_.split(" ")).map(line => (mutable.Map[String, Long]((line, 1)))).reduce {
(map1, map2) => {
map2.foreach {
case (word, count) => {
val newcount = map1.getOrElse(word, 0L) + count
map1.update(word, newcount)
}
}
map1
}
})
}
def wc5(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.countByValue()
.foreach(println)
}
def wc6(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.map((_, 1))
.countByKey()
.foreach(println)
}
def wc7(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" ")).map((_, 1)).combineByKey(
line => line,
(a: Int, b: Int) => a + b,
(a: Int, b: Int) => a + b
).collect
.foreach(println)
}
//TODO aggregateByKey 平均值
def aggregateByKey(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
RDD.aggregateByKey((0, 0))(
(tuple, cnt) => {
(tuple._1 + cnt, tuple._2 + 1)
},
(t1, t2) => {
(t1._1 + t2._1, t2._2 + t2._2)
}
).mapValues {
case (num, cnt) => {
num / cnt
}
}.collect.foreach(println)
}
//TODO combinByKey
def combineByKey(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
RDD.combineByKey(
v => (v, 1),
(tuple: (Int, Int), v) => {
(tuple._1 + v, tuple._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
).mapValues {
case (num, cnt) => {
num / cnt
}
}.collect
.foreach(println)
}
def wc8(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.map((_, 1))
.foldByKey(0)(_ + _)
.collect
.foreach(println)
}
def wc9(sc: SparkContext): Unit = {
val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
RDD.flatMap(_.split(" "))
.map((_, 1))
.aggregateByKey(0)(_ + _, _ + _)
.collect
.foreach(println)
}
}
标签:SparkContext,core,val,03,RDD,._,sc,spark,hello 来源: https://www.cnblogs.com/ftwftw/p/sparkcore03.html