spark-调优(代码层面)
作者:互联网
spark-调优(代码)
在编写代码时可以进行优化
- 避免创建重复的RDD
- 尽可能复用同一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle类算子
- 使用map-side预聚合的shuffle操作
- 使用高性能的算子
- 广播大变量
- 使用Kryo优化序列化性能
- 优化数据结构
- 使用高性能的库fastutil
1.对多次使用的RDD进行持久化
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避 免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传 送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化 级别,会导致JVM的OOM内存溢出异常。
如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别 比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
如何选择一种最合适的持久化策略
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object Demo1Cache {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
/**
* 当对同一个rdd进行多次使用的时候可以将rdd缓存起来
*
*/
//缓存级别是MEMORY_ONLY
//studentsRDD.cache()
//内存放不下放磁盘,同时会对数据做序列化,将一个分区的数据序列化从一个字节数组
studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
/**
* rdd: rdd.cache
* df : df.cache
* sql: cache table student, uncache table studnet
*/
/**
* 统计班级的的人数
*
*/
studentsRDD
.map(stu => (stu.split(",")(3), 1))
.reduceByKey(_ + _)
.map {
case (clazz: String, num: Int) =>
s"$clazz\t$num"
}
.saveAsTextFile("data/cache/clazz_num")
/**
* 统计性别的人数
*
*/
studentsRDD
.map(stu => (stu.split(",")(3), 1))
.reduceByKey(_ + _)
.map {
case (gender: String, num: Int) =>
s"$gender\t$num"
}
.saveAsTextFile("data/cache/gender_num")
/**
* 清空缓存
*/
studentsRDD.unpersist()
while (true) {
}
}
}
2.使用高性能的算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map Transformation算子
- 使用foreachPartitions替代foreach Action算子
- 使用filter之后进行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition与sort类操 作 代码
- repartition:coalesce(numPartitions,true) 增多分区使用这个
- coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition
2.1aggregateByKey案例:
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Demo2AggregateByKey {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("Demo2AggregateByKey")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
val clazzKvDS: RDD[(String, Int)] = studentsRDD.map(stu => (stu.split(",")(4), 1))
/**
* aggregateByKey: 需要两个函数,一个是map端预聚合的函数,一个reduce端汇总的函数
* reduceByKey map端和reduce端聚合函数是一样,
* 如果map端和reduce端要写不一样的聚合函数可以使用aggregateByKey
*
*/
val countRDD: RDD[(String, Int)] = clazzKvDS.aggregateByKey(0)(
(u: Int, i: Int) => u + i,//在map端做聚合函数
(u1: Int, u2: Int) => u1 + u2//在reduce端做聚合的函数
)
countRDD.foreach(println)
}
}
2.2 mapPartitions案例
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.Date
object Demo3MapPartition {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val dataRDD: RDD[String] = sc.textFile("data/ant_user_low_carbon.txt")
val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions(iter => {
iter.map(line => {
//如果只是简单的拆分数据,使用map和mappartition没有区别
val split: Array[String] = line.split("\t")
(split(0), split(1), split(2))
})
})
val resultRDD: RDD[(String, Long, String)] = kvRDD.mapPartitions(iter => {
/**
*
* 可以将一些初始化的代码房子mapPartitions中,减少占用的内存空间
*/
//将时间字段转换成时间戳
//在这里创建的对象,是一个分区创建一个
val format = new SimpleDateFormat("yyyy/MM/dd")
iter.map {
case (id: String, sdate: String, p: String) =>
val dateObj: Date = format.parse(sdate)
val ts: Long = dateObj.getTime
(id, ts, p)
}
})
resultRDD.foreach(println)
}
}
2.3 foreachPartitions案例
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.sql.{Connection, DriverManager, PreparedStatement}
object Demo4foreachPartitions {
def main(args: Array[String]): Unit = {
val startTIme: Long = System.currentTimeMillis()
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
/**
* 将rdd的数据保存到mysql中
*
*/
/**
* foreach: 每一条数据都需要创建一个网络链接
* 不能将网络链接放在算子外(网络链接不能在网络中传输)
*
*/
/* studentsRDD.foreach(stu => {
val split: Array[String] = stu.split(",")
//1、加载启动
Class.forName("com.mysql.jdbc.Driver")
val start: Long = System.currentTimeMillis()
//2、创建链接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
val end: Long = System.currentTimeMillis()
println(s"创建数据库的链接用了:${end - start}")
//3、编写插入数据的sql
val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")
//4、设置列值
stat.setLong(1, split(0).toLong)
stat.setString(2, split(1))
stat.setLong(3, split(2).toLong)
stat.setString(4, split(3))
stat.setString(5, split(4))
//5、执行插入
stat.execute()
//6、关闭链接
stat.close()
con.close()
})*/
/**
* foreachPartition: 一次遍历一个分区的数据
* 如果需要将rdd的数据保存到外部数据库中,比如mysql,hbase,redis, 需要使用foreachPartition
*/
studentsRDD.foreachPartition(iter => {
//1、加载启动
Class.forName("com.mysql.jdbc.Driver")
val start: Long = System.currentTimeMillis()
//2、创建链接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
val end: Long = System.currentTimeMillis()
println(s"创建数据库的链接用了:${end - start}")
//3、编写插入数据的sql
val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")
iter.foreach(stu => {
val split: Array[String] = stu.split(",")
//4、设置列值
stat.setLong(1, split(0).toLong)
stat.setString(2, split(1))
stat.setLong(3, split(2).toLong)
stat.setString(4, split(3))
stat.setString(5, split(4))
//5、执行插入
stat.execute()
})
//6、关闭链接
stat.close()
con.close()
})
val endTIme: Long = System.currentTimeMillis()
println(s"共用了:${endTIme - startTIme}")
}
}
2.4 repartition(重分区)案例
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Demo5RePartition {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
println(s"studentsRDD分区数据:${studentsRDD.getNumPartitions}")
/**
* repartition: 对rdd重分区,返回一个新的rdd, 会产生shuffle
* repartition可以用于增加分区和减少分区,
* 增加分区可以增加并行度,在资源充足的情况下, 效率更高
* 减少分区可以减少产生的小文件的数量
*
*/
val rePartRDD: RDD[String] = studentsRDD.repartition(10)
println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")
/**
* coalesceL 重分区,,可以设置是否产生shuffle
* 如果指定shuffle为true,可以用于增加分区和减少分区
* 如果指定shuffle为false,只能用于减少分区
*
*/
val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)
println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")
/**
* 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件
* 合并的标准:保证合并之后的每一个文件的大小在128M左右
*
* 比如数据保存的数据是10G, 最好的情况是合并为80个
*
* shuffle = false: 不产生shuffle,效率更好
*
*/
coalesceRDD
.coalesce(1, shuffle = false) //合并小文件
.saveAsTextFile("data/coalesce")
}
}
3.广播大变量
开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能
函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能
如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率
广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广 播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo6MapJoin {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val studentDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
.load("data/students.txt")
val scoreDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("id STRING,cid STRING,score DOUBLE")
.load("data/score.txt")
/**
* studentDF.hint("broadcast"): 将小表广播出去
*
* 当一个大表关联小表的时候,可以将小表广播出去,使用mapjoin
* mapjoin 不会产生shuffle,可以提高关联的效率,小表一般要在1G以内
*
* mapjoin 会产生两个job
* 1、第一个job是将小表的数据拉取到Driver端,从Driver端广播到Executor端
* 2、关联的job
*
*/
val joinDF: DataFrame = scoreDF.join(studentDF.hint("broadcast"), "id")
joinDF.show()
while (true) {
}
}
}
4.使用Kryo优化序列化性能(一般重要)
在Spark中,主要有三个地方涉及到了序列化:
- 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
- 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。
- 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
object Demo7Kyyo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("cache")
.config("spark.sql.shuffle.partitions", 1)
//序列化方式
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//指定注册序列化的类,自定义
.config("spark.kryo.registrator", "com.shujia.spark.opt.Demo8KryoRegister")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val studentsRDD: RDD[String] = sc.textFile("data/students.txt")
/**
* 将rdd中一行数据转换成Student的对象
*
*/
val stuRDD: RDD[Student] = studentsRDD.map(stu => {
val split: Array[String] = stu.split(",")
Student(split(0), split(1), split(2).toInt, split(3), split(4))
})
/**
* 不做使用序列化,数据是280K
* 使用默认的序列化的方式: 数据是55K
* 使用kryo进行序列化: 数据大小:43k
*
*
* spark sql 默认已经使用了kryo进行序列化, rdd没有使用,需要自己实现
*
*/
stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)
stuRDD
.map(stu => (stu.clazz, 1))
.reduceByKey(_ + _)
.map {
case (clazz: String, num: Int) =>
s"$clazz\t$num"
}
.foreach(println)
/**
* 统计性别的人数
*
*/
stuRDD
.map(stu => (stu.gender, 1))
.reduceByKey(_ + _)
.map {
case (gender: String, num: Int) =>
s"$gender\t$num"
}
.foreach(println)
while (true) {
}
}
case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
}
上述的代码需要的工具类: KryoRegistrator
package com.shujia.spark.opt
import com.esotericsoftware.kryo.Kryo
import com.shujia.spark.opt.Demo7Kyyo.Student
import org.apache.spark.serializer.KryoRegistrator
class Demo8KryoRegister extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
/**
* 在这个方法中将需要使用kryo进行序列化的类做一个注册
*
*/
kryo.register(classOf[Student])
kryo.register(classOf[String])
kryo.register(classOf[Int])
}
}
标签:String,val,代码,RDD,调优,split,import,spark 来源: https://www.cnblogs.com/atao-BigData/p/16503631.html