其他分享
首页 > 其他分享> > Spark优化

Spark优化

作者:互联网

1.从多个kafka topic中接收数据,可以用多个Reciver接收,然后合并在一起进行处理

 

 2.receiver’s block interval(接收器的块间隔),这由configuration parameter(配置参数) 的 spark.streaming.blockInterval 决定。对于大多数 receivers(接收器),接收到的数据 coalesced(合并)在一起存储在 Spark 内存之前的 blocks of data(数据块).

每个 receiver(接收器)每 batch(批次)的任务数量将是大约(batch interval(批间隔)/ block interval(块间隔))

例如,200 ms的 block interval(块间隔)每 2 秒 batches(批次)创建 10 个 tasks(任务)
如果你集群里有40个cpu核,那么10个任务只能用10个核,还有30个核空闲着,所以,要增大批次间隔或调小block interval
这时,我把block interval调整到100ms,再把批次间隔调整到4秒,这样正好等于40核,但是要有个富余量,可以把批次间隔调整为3.5秒

推荐的 block interval(块间隔)最小值约为 50ms,低于此任务启动开销可能是一个问题。

3.cpu conf

spark.default.parallelism  建议设置为集群中cpu总核数

4.使用kryo替换spark的默认序列化器

//spark需要序列化的地方
A、receiver接收器
B、persist持久化
C、reduceBy。。。。时 (有shuffle时)
D、广播变量时

conf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //设置kryo为序列化器
.registerKryoClasses(Array(classOf[String],classOf[Array[(Long, Long, String)]],classOf[Student])) //设置要序列化的类

5.数据批次Job执行的时间不要大于batchsize的时间

6.调节堆内存 transmission和persist内存使用占比

spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDD的cache、persist操作进行RDD数据缓存用的;另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。默认情况下,给RDD cache操作的内存占比是0.6,即60%的内存都给了cache操作了。但是问题是,如果某些情况下cache占用的内存并不需要占用那么大,这个时候可以将其内存占比适当降低。怎么判断在什么时候调整RDD cache的内存占用比呢?其实通过Spark监控平台就可以看到Spark作业的运行情况了,如果发现task频繁的gc,就可以去调整cache的内存占用比了。通过SparkConf.set("spark.storage.memoryFraction","0.6")来设定。

//如果你程序中不用cache或者cache的数据很小,就可以减小这个占比
SparkConf.set("spark.storage.memoryFraction","0.6")

7.调节堆内存 shuffle和persist的内存使用占比

spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

8.

spark内存分为

可用内存等于executor内存-System Reserved内存(300M)

Storeage -- 可用内存的 * 60% * 50% 用于cache
Execution --可用内存的 * 60% * 50% 用于task的运行
Ohter --可用内存 * 40% 用于存储spark内部元数据和用户定义的数据结构
System Reserved 300M 与other的作用相同

Unified Memory统一内存 = 可用内存 * 60%
Storeage和Execution内存默认平分Unified Memory统一内存
Storeage和Execution内存可以动态占用,但是Storeage内存是不可驱逐内存,一旦存储就不能被占用

Unified Memory的大小可以通过spark.memory.fraction参数来调节,默认0.6

SparkUI中显示的Storeage内存表示的是Unified Memory统一内存

9.调节堆外内存

重要:堆外内存只提供给Storeage和Execution内存区使用,没用other区和System Reserved区

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现[3]),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

10.连接等待时长的调整

由于JVM内存过小,导致频繁的Minor gc,有时候更会触犯full gc,一旦出发full gc;此时所有程序暂停,导致无法建立网络连接;spark默认的网络连接的超时时长是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。

bin/spark-submit \
--conf spark.network.timeout=300s \

标签:shuffle,堆外,cache,内存,spark,优化,Spark
来源: https://www.cnblogs.com/canlovegolove/p/15214119.html