其他分享
首页 > 其他分享> > spark---- RDD算子之Action算子

spark---- RDD算子之Action算子

作者:互联网

Action算子   

 调用sc.ranjob方法,根据最后一个RDD从后往前推,触发Action就会生成DAG,切分Stage,生成TaskSet

 算子:  aggregate  foreach  foreachPartition  count sum  fold  reduce  max  min  take  first  top  takeOrdered

aggregate  聚合   ,设置初始值,先局部聚合在全局聚合

package cn.doit.spark.day04.Action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5,6,7,8,9,10)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //初始值为0,先局部聚合,在全局聚合
    //局部聚合是在worker下的excuter, 全局聚合是在dirver端
    val rdd2: Int = rdd1.aggregate(0)(_ + _ , _ + _)
    
    println(rdd2)

    sc.stop()
  }
}

foreach   将数一条一条的取出来,传入一个函数,这个函数返回Unit, 比如传入一个打印的逻辑, 打印的结果在Executor端的日志中

写入数据库的正确方式:

                                   Executor中的Task通过网路传输到Dirver中的数组中,中间可能会丢失数据,
                                   Dirver的数组中的数据往mysql中写入通过网络传输也可能会丢失数据,而且写入效率低,小量数据问题不大.
                                   Executor中的Task可以并发的直接往mysql中写(foreach),不经过Dirver,不通过网络传输,效率高

scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val rdd1 = sc.parallelize(arr , 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26

scala> rdd1.foreach(e => println(e))                      //这是对rdd进行foreach, 在Executer执行,需要去worker中查看

scala> rdd1.collect.foreach(e => println(e))          //这是对数组进行foreach, 在dirver中执行,会直接打印到控制台 
1
2
3
4
5
6
7
8
9
10

scala> 

foreachPartition  以分区为单位,每一个分区就是一个Task,以后可以将数据写入到数据库中,一个分区一个连接,效率更高

package cn.doit.spark.day04.Action

import java.sql.DriverManager
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ForeachPartitionDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5,6,7,8,9,10)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //如果有多个Action,把foreachPartitionAsync放在前面
    //在执行的同时也可以往下执行
    rdd1.foreachPartitionAsync(iter => {
      iter.foreach(e =>{

      })
    })

    //对分区中的数据进行遍历
    rdd1.foreachPartition(iter => {
      //事先创建好连接对象
      val conn = DriverManager.getConnection("", "", "")
      //遍历迭代器中的数据
      //也可以使用while遍历,foreach底层就是while遍历
      iter.foreach(e => {
        //复用外面定义的connection,以分区中的多条数据会使用一个连接对象,可以节省资源

      })
    })
  }
}

count   计数,先局部,再全局

package cn.doit.spark.day04.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
    Count是一个Action算子
    Count会在每一个分区内进行局部计数,来一条+1
    在Dirver端进行全局的计算
 */
object CountDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5,6,7,8,9,10)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    val rdd2: Long = rdd1.count()
    println(rdd2)
  }
}

fold  sum  reduce  聚合

package cn.doit.spark.day04.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/*
  Sum是一个Action算子
  sum会在每一个分区内进行局部求和
  在Dirver端进行全局求和
 */
object SumDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(1,2,3,4,5,6,7,8,9,10)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //参数一初始值,局部聚合调用一次,全局聚合也调用一次
//    val rdd2: Int = rdd1.fold(1000)(_ + _)   //55

    //先局部求和,在全局求和  sum底层调用的fold
    // 局部聚合在Executor端聚合,全局聚合在Dirver端
//    val rdd2: Double = rdd1.sum()    //55

    //reduce也是先局部聚合,在全局聚合,底层调用的reduceLeft
    val rdd2: Int = arr.reduce(_ + _)   //55
    println(rdd2)
  }
}

Min  Max   先局部比较,再全局比较

package cn.doit.spark.day04.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
    min和max是一个Action算子
    会在每一个分区内进行局部两两比较大小
    最后在Dirver端进行全局的比较大小
 */
object MinAndMaxDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(10,2,9,6,4,5,7,8,3,1)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //先局部两两进行比较,再全局进行比较
//    val rdd2 = rdd1.reduce(Math.max(_, _))
//    val rdd2 = rdd1.reduce(Math.min(_, _))
//    val rdd2 = rdd1.max()
    val rdd2 = rdd1.min()

    println(rdd2)
  }
}

take   first    take取前n个数据,会触发以到多个Action     first类似于take(1),取数据集的第一个元素

package cn.doit.spark.day04.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
    Take是一个Action算子
    可能触发多个Action
    每一次触发Action取一个分区的数据,不够n个到下一个分区取
 */
object TakeAndFirst {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(10,2,9,6,4,5,7,8,3,1)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //取前n个数据
    val rdd2: Array[Int] = rdd1.take(4)    // 10 2 9 6
    println(rdd2.toBuffer)

    //返回数据集的的第一个元素,类似于take(1)
    val rdd3 = rdd1.first()
    println(rdd3)
  }
}

top  takeOrdered   top:底层调用的是takeOrdered, 调用MapPartitions先在每一个分区将放入到有界优先队列,每个分区返回的有界优先再进行++=

package cn.doit.spark.day04.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/*
    Top是一个Action算子
    在每一个分区内使用有界优先队列(类似一个集合,会自动帮你排序)对N个数据进行排序,溢出不符合的数据
    在Dirver端在将每个分区返回的结果用有界优先队列进行全局的比大小
    有界优先队列类似一个TreeSet,但是不去重
 */
object TopDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("AggregateDemo").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val arr = Array(10,2,9,6,4,5,7,8,3,1)

    val rdd1: RDD[Int] = sc.parallelize(arr, 2)

    //取最大的两个元素,底层调用takeOrdered
    //底层先遍历所有数据,放进优先队列中,将每个区的最大的2两个取出来,最后再进行比较,选最大的两个
    val rdd2: Array[Int] = rdd1.top(2)   //9 10
    println(rdd2.toBuffer)

    //取最小的3个元素,底层调用了mapPartitions
    val rdd3 = rdd1.takeOrdered(3)      // 1 2 3
    println(rdd3.toBuffer)
  }
}

 

标签:arr,rdd1,val,----,RDD,算子,Action,spark,Array
来源: https://blog.csdn.net/weixin_51077563/article/details/111766123