其他分享
首页 > 其他分享> > spark-调优(代码层面)

spark-调优(代码层面)

作者:互联网

spark-调优(代码)

在编写代码时可以进行优化

  1. 避免创建重复的RDD
  2. 尽可能复用同一个RDD
  3. 对多次使用的RDD进行持久化
  4. 尽量避免使用shuffle类算子
  5. 使用map-side预聚合的shuffle操作
  6. 使用高性能的算子
  7. 广播大变量
  8. 使用Kryo优化序列化性能
  9. 优化数据结构
  10. 使用高性能的库fastutil

1.对多次使用的RDD进行持久化

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避 免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传 送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化 级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别 比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如何选择一种最合适的持久化策略
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。

package com.shujia.spark.opt

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object Demo1Cache {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 当对同一个rdd进行多次使用的时候可以将rdd缓存起来
     *
     */

    //缓存级别是MEMORY_ONLY
    //studentsRDD.cache()

    //内存放不下放磁盘,同时会对数据做序列化,将一个分区的数据序列化从一个字节数组
    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

    /**
     * rdd: rdd.cache
     * df : df.cache
     * sql: cache table student,  uncache table studnet
     */

    /**
     * 统计班级的的人数
     *
     */
    studentsRDD
      .map(stu => (stu.split(",")(3), 1))
      .reduceByKey(_ + _)
      .map {
        case (clazz: String, num: Int) =>
          s"$clazz\t$num"
      }
      .saveAsTextFile("data/cache/clazz_num")

    /**
     * 统计性别的人数
     *
     */
    studentsRDD
      .map(stu => (stu.split(",")(3), 1))
      .reduceByKey(_ + _)
      .map {
        case (gender: String, num: Int) =>
          s"$gender\t$num"
      }
      .saveAsTextFile("data/cache/gender_num")

    /**
     * 清空缓存
     */
    studentsRDD.unpersist()
    while (true) {
    }
  }
}

2.使用高性能的算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey
  2. 使用mapPartitions替代普通map Transformation算子
  3. 使用foreachPartitions替代foreach Action算子
  4. 使用filter之后进行coalesce操作
  5. 使用repartitionAndSortWithinPartitions替代repartition与sort类操 作 代码
  6. repartition:coalesce(numPartitions,true) 增多分区使用这个
  7. coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition

2.1aggregateByKey案例:

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Demo2AggregateByKey {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo2AggregateByKey")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    val clazzKvDS: RDD[(String, Int)] = studentsRDD.map(stu => (stu.split(",")(4), 1))

    /**
     * aggregateByKey: 需要两个函数,一个是map端预聚合的函数,一个reduce端汇总的函数
     * reduceByKey map端和reduce端聚合函数是一样,
     * 如果map端和reduce端要写不一样的聚合函数可以使用aggregateByKey
     *
     */
    val countRDD: RDD[(String, Int)] = clazzKvDS.aggregateByKey(0)(
      (u: Int, i: Int) => u + i,//在map端做聚合函数
      (u1: Int, u2: Int) => u1 + u2//在reduce端做聚合的函数
    )
    countRDD.foreach(println)
  }
}

2.2 mapPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import java.text.SimpleDateFormat
import java.util.Date

object Demo3MapPartition {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val dataRDD: RDD[String] = sc.textFile("data/ant_user_low_carbon.txt")

    val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions(iter => {
      iter.map(line => {
        //如果只是简单的拆分数据,使用map和mappartition没有区别
        val split: Array[String] = line.split("\t")
        (split(0), split(1), split(2))
      })
    })

    val resultRDD: RDD[(String, Long, String)] = kvRDD.mapPartitions(iter => {
      /**
       *
       * 可以将一些初始化的代码房子mapPartitions中,减少占用的内存空间
       */
      //将时间字段转换成时间戳
      //在这里创建的对象,是一个分区创建一个
      val format = new SimpleDateFormat("yyyy/MM/dd")

      iter.map {
        case (id: String, sdate: String, p: String) =>
          val dateObj: Date = format.parse(sdate)
          val ts: Long = dateObj.getTime
          (id, ts, p)
      }
    })
    resultRDD.foreach(println)
  }
}

2.3 foreachPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.sql.{Connection, DriverManager, PreparedStatement}

object Demo4foreachPartitions {
  def main(args: Array[String]): Unit = {

    val startTIme: Long = System.currentTimeMillis()

    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 将rdd的数据保存到mysql中
     *
     */

    /**
     * foreach: 每一条数据都需要创建一个网络链接
     * 不能将网络链接放在算子外(网络链接不能在网络中传输)
     *
     */
    /* studentsRDD.foreach(stu => {
       val split: Array[String] = stu.split(",")
       //1、加载启动
       Class.forName("com.mysql.jdbc.Driver")
       val start: Long = System.currentTimeMillis()
       //2、创建链接
       val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
       val end: Long = System.currentTimeMillis()
       println(s"创建数据库的链接用了:${end - start}")
       //3、编写插入数据的sql
       val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")
       //4、设置列值
       stat.setLong(1, split(0).toLong)
       stat.setString(2, split(1))
       stat.setLong(3, split(2).toLong)
       stat.setString(4, split(3))
       stat.setString(5, split(4))

       //5、执行插入
       stat.execute()

       //6、关闭链接
       stat.close()
       con.close()
     })*/

    /**
     * foreachPartition: 一次遍历一个分区的数据
     * 如果需要将rdd的数据保存到外部数据库中,比如mysql,hbase,redis, 需要使用foreachPartition
     */
    studentsRDD.foreachPartition(iter => {

      //1、加载启动
      Class.forName("com.mysql.jdbc.Driver")
      val start: Long = System.currentTimeMillis()
      //2、创建链接
      val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
      val end: Long = System.currentTimeMillis()
      println(s"创建数据库的链接用了:${end - start}")
      //3、编写插入数据的sql
      val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")

      iter.foreach(stu => {
        val split: Array[String] = stu.split(",")
        //4、设置列值
        stat.setLong(1, split(0).toLong)
        stat.setString(2, split(1))
        stat.setLong(3, split(2).toLong)
        stat.setString(4, split(3))
        stat.setString(5, split(4))

        //5、执行插入
        stat.execute()
      })
      //6、关闭链接
      stat.close()
      con.close()
    })

    val endTIme: Long = System.currentTimeMillis()

    println(s"共用了:${endTIme - startTIme}")
  }
}

2.4 repartition(重分区)案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Demo5RePartition {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    println(s"studentsRDD分区数据:${studentsRDD.getNumPartitions}")

    /**
     * repartition: 对rdd重分区,返回一个新的rdd,  会产生shuffle
     * repartition可以用于增加分区和减少分区,
     * 增加分区可以增加并行度,在资源充足的情况下, 效率更高
     * 减少分区可以减少产生的小文件的数量
     *
     */
    val rePartRDD: RDD[String] = studentsRDD.repartition(10)

    println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")

    /**
     * coalesceL 重分区,,可以设置是否产生shuffle
     * 如果指定shuffle为true,可以用于增加分区和减少分区
     * 如果指定shuffle为false,只能用于减少分区
     *
     */

    val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)
    println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")

    /**
     * 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件
     * 合并的标准:保证合并之后的每一个文件的大小在128M左右
     *
     * 比如数据保存的数据是10G, 最好的情况是合并为80个
     *
     * shuffle = false: 不产生shuffle,效率更好
     *
     */
    coalesceRDD
      .coalesce(1, shuffle = false) //合并小文件
      .saveAsTextFile("data/coalesce")
  }
}

3.广播大变量

开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能

函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能

如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率

广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广 播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo6MapJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val studentDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
      .load("data/students.txt")

    val scoreDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,cid STRING,score DOUBLE")
      .load("data/score.txt")

    /**
     * studentDF.hint("broadcast"): 将小表广播出去
     *
     * 当一个大表关联小表的时候,可以将小表广播出去,使用mapjoin
     * mapjoin 不会产生shuffle,可以提高关联的效率,小表一般要在1G以内
     *
     * mapjoin 会产生两个job
     * 1、第一个job是将小表的数据拉取到Driver端,从Driver端广播到Executor端
     * 2、关联的job
     *
     */
    val joinDF: DataFrame = scoreDF.join(studentDF.hint("broadcast"), "id")

    joinDF.show()
    while (true) {
    }
  }
}

4.使用Kryo优化序列化性能(一般重要)

在Spark中,主要有三个地方涉及到了序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object Demo7Kyyo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      //序列化方式
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //指定注册序列化的类,自定义
      .config("spark.kryo.registrator", "com.shujia.spark.opt.Demo8KryoRegister")
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 将rdd中一行数据转换成Student的对象
     *
     */
    val stuRDD: RDD[Student] = studentsRDD.map(stu => {
      val split: Array[String] = stu.split(",")
      Student(split(0), split(1), split(2).toInt, split(3), split(4))
    })

    /**
     * 不做使用序列化,数据是280K
     * 使用默认的序列化的方式: 数据是55K
     * 使用kryo进行序列化: 数据大小:43k
     *
     *
     * spark sql 默认已经使用了kryo进行序列化, rdd没有使用,需要自己实现
     *
     */

    stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)

    stuRDD
      .map(stu => (stu.clazz, 1))
      .reduceByKey(_ + _)
      .map {
        case (clazz: String, num: Int) =>
          s"$clazz\t$num"
      }
      .foreach(println)

    /**
     * 统计性别的人数
     *
     */
    stuRDD
      .map(stu => (stu.gender, 1))
      .reduceByKey(_ + _)
      .map {
        case (gender: String, num: Int) =>
          s"$gender\t$num"
      }
      .foreach(println)

    while (true) {
    }
  }
  case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
}
上述的代码需要的工具类: KryoRegistrator
package com.shujia.spark.opt
import com.esotericsoftware.kryo.Kryo
import com.shujia.spark.opt.Demo7Kyyo.Student
import org.apache.spark.serializer.KryoRegistrator

class Demo8KryoRegister extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    /**
     * 在这个方法中将需要使用kryo进行序列化的类做一个注册
     *
     */
    kryo.register(classOf[Student])
    kryo.register(classOf[String])
    kryo.register(classOf[Int])
  }
}

标签:String,val,代码,RDD,调优,split,import,spark
来源: https://www.cnblogs.com/atao-BigData/p/16503631.html