其他分享
首页 > 其他分享> > Spark-RDD,算子

Spark-RDD,算子

作者:互联网

Spark内核

RDD

ResilientDistributedDataset (弹性分布式数据集 )

五大特性:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

RDD五大特性

1、RDD由一组分区组成。读取文件默认一个block块

对应一个分区后面rdd的分区数和前面rdd-样

2、函数实际上是作用在每个分区上的,一个分区由一个task处理, 有多少个分区就有多少个task写代码是基于RDD写的代码,最终运行时,算

子是作用在分区上的

3、RDD之间有依赖关系,宽依赖和窄依赖,有shuffle是 宽依赖,没有shuffle窄依赖宽依赖前是 个Stage, 宽依赖后是个stage(map或者reduce端)

4、分区类的算子只能作用在kv格式的rdd上,groupByKey reduceByKey5、Spark为task的计算提供了最佳的计算位置,移动计算而不是移动数据

关于RDD的分区数

testFile 可以读取文件夹,但是文件夹不能包括子文件夹

RDD 的分区数,分区数越高,并行度越高,在资源重足的情况下效率越高
* 1. 读取文件,默认等于切片的数量
* 2. 读取文件可以设置最新的分区数量-minPartitions.
*
* 3. 控制RDD的分区数,只能爱切片的基础上,增加分区数,不能减少分区数
* 4. 宽依赖算子分区数默认等于前一个RDD,也可以设置分区数

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo2Partition {
  def main(args: Array[String]): Unit = {
    //创建spark对象
    val conf = new SparkConf()
    //创建spark任务名称
    conf.setAppName("Demo2Partition")
    //设置spark的任务为本地执行
    conf.setMaster("local")
    //创建spark上下文的对象,作为程序的入口
    val sc = new SparkContext(conf)

    /**
     * testFile 可以读取文件夹,但是文件夹不能包括子文件夹
     */

    /**
     * RDD 的分区数,分区数越高,并行度越高,在资源重足的情况下效率越高
     * 1. 读取文件,默认等于切片的数量
     * 2. 读取文件可以设置最新的分区数量-minPartitions.
     *
     * 3. 控制RDD的分区数,只能爱切片的基础上,增加分区数,不能减少分区数
     * 4. 宽依赖算子分区数默认等于前一个RDD,也可以设置分区数
     *
     */

    //读取文件
    val linesRDD: RDD[String] = sc.textFile("data/students.txt",7)

    //查看分区数
    println(s"linesRDD的分区数为:${linesRDD.getNumPartitions}")

    val wordsRDD: RDD[String] = linesRDD.flatMap(lines => lines.split(","))

    //查看分区数
    println(s"wordsRDD的分区数为:${wordsRDD.getNumPartitions}")

    val kvRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(w => w)

    //查看分区数
    println(s"kvRDD的分区数为:${kvRDD.getNumPartitions}")


    val wordCount: RDD[(String, Int)] = kvRDD.map(kv => (kv._1, kv._2.size))

    //查看分区数
    println(s"wordCount的分区数为:${wordCount.getNumPartitions}")

    wordCount.saveAsTextFile("data/wc")
  }

}

运行结果如下

接下来就是重点了-算子

map

map:将rdd数据一条一条传递给后面的函数,函数的返回值构建成一个新的rdd

map:算子不会改变总的数据长度

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo3Map")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //读取学生表数据
    val lineRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * map:将rdd数据一条一条传递给后面的函数,函数的返回值构建成一个新的rdd
     * map:算子不会改变总的数据长度
     */

    val nameRDD: RDD[String] = lineRDD.map(line => line.split(",")(1))

    nameRDD.foreach(println)

  }

}

filter-过滤数据

Filter: 将RDD的数据一条一条的传递给函数,如果返回true就保留数据,如果为false就过滤数据

Filter:会减少RDD的行数

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo4Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo4Filter")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * Filter: 将RDD的数据一条一条的传递给函数,如果返回true就保留数据,如果为false就过滤数据
     *
     * Filter: 会减少RDD的行数
     *
     */


    val filterRDD: RDD[String] = lineRDD.filter(line =>{
      val age: Int = line.split(",")(2).toInt
      age.equals(22)
    } )

    filterRDD.foreach(println)
  }

}

flatmap - 展开

package com.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("flatmap")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //读取文件
    val lineRDD: RDD[String] = sc.textFile("data/word.txt")

    /**
     *flatMap:一条一条的将RDD的数据返回给后面的函数,函数的返回值必须是一个集合,最后会将集合展开构建成一个新的RDD
     *
     */
    val wordsRDD: RDD[String] = lineRDD.flatMap(line => line.split(","))

    wordsRDD.foreach(println)
  }

}

sample-抽样

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo6Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("map")
    conf.setMaster("local")

    val sc = new SparkContext(conf)


    //读取学生表的数据
    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * sample : 可以从数据中抽样一部分数据
     *
     */
    val sample: RDD[String] = studentsRDD.sample(withReplacement = true, 0.1)

    sample.foreach(println)
  }

}

标签:String,val,分区,RDD,conf,算子,Spark,spark
来源: https://www.cnblogs.com/atao-BigData/p/16468704.html