spark---- RDD算子之Action算子
作者:互联网
Action算子
调用sc.ranjob方法,根据最后一个RDD从后往前推,触发Action就会生成DAG,切分Stage,生成TaskSet
算子: aggregate foreach foreachPartition count sum fold reduce max min take first top takeOrdered
aggregate 聚合 ,设置初始值,先局部聚合在全局聚合
package cn.doit.spark.day04.Action
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object AggregateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//初始值为0,先局部聚合,在全局聚合
//局部聚合是在worker下的excuter, 全局聚合是在dirver端
val rdd2: Int = rdd1.aggregate(0)(_ + _ , _ + _)
println(rdd2)
sc.stop()
}
}
foreach 将数一条一条的取出来,传入一个函数,这个函数返回Unit, 比如传入一个打印的逻辑, 打印的结果在Executor端的日志中
写入数据库的正确方式:
Executor中的Task通过网路传输到Dirver中的数组中,中间可能会丢失数据,
Dirver的数组中的数据往mysql中写入通过网络传输也可能会丢失数据,而且写入效率低,小量数据问题不大.
Executor中的Task可以并发的直接往mysql中写(foreach),不经过Dirver,不通过网络传输,效率高
scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)scala> val rdd1 = sc.parallelize(arr , 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26scala> rdd1.foreach(e => println(e)) //这是对rdd进行foreach, 在Executer执行,需要去worker中查看
scala> rdd1.collect.foreach(e => println(e)) //这是对数组进行foreach, 在dirver中执行,会直接打印到控制台
1
2
3
4
5
6
7
8
9
10scala>
foreachPartition 以分区为单位,每一个分区就是一个Task,以后可以将数据写入到数据库中,一个分区一个连接,效率更高
package cn.doit.spark.day04.Action
import java.sql.DriverManager
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ForeachPartitionDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//如果有多个Action,把foreachPartitionAsync放在前面
//在执行的同时也可以往下执行
rdd1.foreachPartitionAsync(iter => {
iter.foreach(e =>{
})
})
//对分区中的数据进行遍历
rdd1.foreachPartition(iter => {
//事先创建好连接对象
val conn = DriverManager.getConnection("", "", "")
//遍历迭代器中的数据
//也可以使用while遍历,foreach底层就是while遍历
iter.foreach(e => {
//复用外面定义的connection,以分区中的多条数据会使用一个连接对象,可以节省资源
})
})
}
}
count 计数,先局部,再全局
package cn.doit.spark.day04.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
Count是一个Action算子
Count会在每一个分区内进行局部计数,来一条+1
在Dirver端进行全局的计算
*/
object CountDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
val rdd2: Long = rdd1.count()
println(rdd2)
}
}
fold sum reduce 聚合
package cn.doit.spark.day04.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
Sum是一个Action算子
sum会在每一个分区内进行局部求和
在Dirver端进行全局求和
*/
object SumDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//参数一初始值,局部聚合调用一次,全局聚合也调用一次
// val rdd2: Int = rdd1.fold(1000)(_ + _) //55
//先局部求和,在全局求和 sum底层调用的fold
// 局部聚合在Executor端聚合,全局聚合在Dirver端
// val rdd2: Double = rdd1.sum() //55
//reduce也是先局部聚合,在全局聚合,底层调用的reduceLeft
val rdd2: Int = arr.reduce(_ + _) //55
println(rdd2)
}
}
Min Max 先局部比较,再全局比较
package cn.doit.spark.day04.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
min和max是一个Action算子
会在每一个分区内进行局部两两比较大小
最后在Dirver端进行全局的比较大小
*/
object MinAndMaxDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(10,2,9,6,4,5,7,8,3,1)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//先局部两两进行比较,再全局进行比较
// val rdd2 = rdd1.reduce(Math.max(_, _))
// val rdd2 = rdd1.reduce(Math.min(_, _))
// val rdd2 = rdd1.max()
val rdd2 = rdd1.min()
println(rdd2)
}
}
take first take取前n个数据,会触发以到多个Action first类似于take(1),取数据集的第一个元素
package cn.doit.spark.day04.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
Take是一个Action算子
可能触发多个Action
每一次触发Action取一个分区的数据,不够n个到下一个分区取
*/
object TakeAndFirst {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(10,2,9,6,4,5,7,8,3,1)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//取前n个数据
val rdd2: Array[Int] = rdd1.take(4) // 10 2 9 6
println(rdd2.toBuffer)
//返回数据集的的第一个元素,类似于take(1)
val rdd3 = rdd1.first()
println(rdd3)
}
}
top takeOrdered top:底层调用的是takeOrdered, 调用MapPartitions先在每一个分区将放入到有界优先队列,每个分区返回的有界优先再进行++=
package cn.doit.spark.day04.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
Top是一个Action算子
在每一个分区内使用有界优先队列(类似一个集合,会自动帮你排序)对N个数据进行排序,溢出不符合的数据
在Dirver端在将每个分区返回的结果用有界优先队列进行全局的比大小
有界优先队列类似一个TreeSet,但是不去重
*/
object TopDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array(10,2,9,6,4,5,7,8,3,1)
val rdd1: RDD[Int] = sc.parallelize(arr, 2)
//取最大的两个元素,底层调用takeOrdered
//底层先遍历所有数据,放进优先队列中,将每个区的最大的2两个取出来,最后再进行比较,选最大的两个
val rdd2: Array[Int] = rdd1.top(2) //9 10
println(rdd2.toBuffer)
//取最小的3个元素,底层调用了mapPartitions
val rdd3 = rdd1.takeOrdered(3) // 1 2 3
println(rdd3.toBuffer)
}
}
标签:arr,rdd1,val,----,RDD,算子,Action,spark,Array 来源: https://blog.csdn.net/weixin_51077563/article/details/111766123