其他分享
首页 > 其他分享> > Spark(RDD)

Spark(RDD)

作者:互联网

RDD

1.所谓的RDD,其实就是一个数据结构,类似于链表中的Node

2.RDD中有适合并行计算的分区操作

3.RDD中封装了最小的计算单元,目的是更适合重复使用

4.Spark的计算主要就是通过组合RDD的操作,完成业务需求

1.从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.default.parallelism", "4")
        val sc = new SparkContext(conf)
      val rdd: RDD[Int] = sc.makeRdd(List(1,2,3,4),2)
      val rdd: RDD[Int] = sc.textFile("input",2)
      sc.stop()

2.RDD转换算子

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

value类型

1)map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
    num => {
        num * 2
    }
)
val dataRDD2: RDD[String] = dataRDD1.map(
    num => {
        "" + num
    }
)

2)mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,map 是一个数据一个数据的处理,mapPartitions是一个分区为处理单位
// TODO 算子 - 转换 -
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        // 获取每个数据分区的最大值
        // 【1,2】【3,4】
        val rdd1 = rdd.mapPartitions(
            list => {
                val max = list.max
                List(max).iterator
            }
        )
        rdd1.collect.foreach(println)

3)mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)

    val rdd2: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex {
      case (index, datas) => {

        datas.map((_,index))
      }
    }

4)flatMap

将处理的数据进行扁平化后再进行映射处理,
val rdd = sc.makeRDD(
            List(
                "Hello Scala", "Hello Spark"
            )
        )
        val rdd1 = sc.makeRDD(
            List(
                List(1,2), List(3,4)
            )
        )

        // 整体 => 个体
        //val rdd2 = rdd.flatMap(_.split(" "))
        val rdd2 = rdd.flatMap(
            str => { // 整体(1)
                // 容器(个体(N))
                str.split(" ")
            }
        )

        val rdd3 = rdd1.flatMap(
            list => {
                list
            }
        )

val rdd = sc.makeRDD(
            List(List(1,2),3,List(4,5))
        )

        val rdd1 = rdd.flatMap {
            case list : List[_] => list
            case other => List(other)
        }

        rdd1.collect.foreach(println)

5)glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 
相当于聚合,将相同分区的数据放到一个数组中
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)

        val rdd1: RDD[Array[Int]] = rdd.glom()

        rdd1.collect().foreach(a => println(a.mkString(",")))

 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
    val rdd2: RDD[Array[Int]] = rdd.glom()
     val rdd3: RDD[Int] = rdd2.map(_.max)
    println(rdd3.collect().sum)

6)groupBy 可以实现wordcount

默认情况下,数据处理后,所在分区不会发生改变
Spark要求,一个组的数据必须在一个分区中
一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称之为shuffle
shuffle操作不允许在内存中等待,必须落盘
从服务器日志数据apache.log中获取每个时间段访问量

val rdd: RDD[String] = sc.textFile("data/apache.log")
   //(10,1),(11,1),(10,1)
    val rdd2: RDD[(String, Int)] = rdd.map(line => {
      val strings: Array[String] = line.split(" ")
      val str: String = strings(3)
      val strings1: Array[String] = str.split(":")
      (strings1(1), 1)
    })
    val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(_._1)
    //(10,list((10,1),(10,1)))
    val rdd4: RDD[(String, Int)] = rdd3.mapValues(_.size)
    rdd4.collect().foreach(println)

7)filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

 val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        // filter算子可以按照指定的规则对每一条数据进行筛选过滤
        // 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
        val rdd1 = rdd.filter(
            num => num % 2 == 1    //等于1的留下  其他的舍弃
        )

        rdd1.collect.foreach(println)

8)sample

// 抽取数据,采样数据
        // 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
        // 第二个参数和第一个参数有关系
        //     如果抽取不放回的场合:参数表示每条数据被抽取的几率
        //     如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
        // 第三个参数是【随机数】种子
        //     随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
        //     3 = xxxxx(10)
        //     7 = xxxxx(3)
        //val rdd1: RDD[Int] = rdd.sample(false, 0.5)
        //val rdd1: RDD[Int] = rdd.sample(true, 2)
        val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
        rdd1.collect.foreach(println)

9) coalesce

 // TODO 算子 - 转换 - 缩减分区
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4,5,6), 3
        )

        // 缩减 (合并), 默认情况下,缩减分区不会shuffle
        //val rdd1: RDD[Int] = rdd.coalesce(2)
        // 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
        val rdd2: RDD[Int] = rdd.coalesce(2, true)   //true代表数据的shuffle操作  将数据打乱和其他分区的数据组合在一起

        rdd.saveAsTextFile("output")
        rdd2.saveAsTextFile("output1")

10) distinct

底层有shuffle

 // TODO 算子 - 转换
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,1,1)
        )

        // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        // 【1,1,1】
        // 【(1, null),(1, null),(1, null)】
        // 【null, null, null】
        // 【null, null】
        // 【(1, null)】
        // 【1】
        val rdd1: RDD[Int] = rdd.distinct()
        rdd1.collect.foreach(println)

        //List(1,1,1,1,1).distinct

11) repartition

标签:rdd1,val,rdd,Int,List,RDD,Spark
来源: https://www.cnblogs.com/xiao-bu/p/14843007.html