数据库
首页 > 数据库> > Spark SQL 3.0 自适应执行优化引擎

Spark SQL 3.0 自适应执行优化引擎

作者:互联网

在本篇文章中,笔者将给大家带来 Spark SQL 中关于自适应执行引擎(Spark Adaptive Execution)的内容。

在之前的文章中,笔者介绍过 Flink SQL,目前 Flink 社区在积极地更新迭代 Flink SQL 功能和优化性能,尤其 Flink 1.10.0 版本的发布,在增强流式 SQL 处理能力的同时也具备了成熟的批处理能力。但是在 SQL 功能完整性和生产环境的实践应用等方面,Spark SQL 还是更胜一筹,至于 SQL 批处理方面性能优劣,则需要笔者亲自去实践。

不过,在超大规模集群和海量数据集上,Spark SQL 目前仍然在稳定性和性能方面遇到一些挑战。为了应对这些挑战,Spark 社区进行了改进并引入了自适应执行引擎,它可以在运行时动态地处理任务并行度、join 策略优化和数据倾斜,确保使用运行时统计信息选择最佳执行计划。笔者参考 Haifeng Chen 分享的主题《 Spark Adaptive Execution Unleash the Power of Spark SQL 》,再结合实际情况进行梳理。

挑战

我们首先来看一下,Spark SQL 在实际生产案例中遇到的一些挑战。

挑战 1:并行度问题

在日常的 Spark SQL 开发中,我们通过设置 spark.sql.shuffle.partitions 参数来调整 partition 数量,默认值是200。即 Shuffle partition 数量需要手动调整才可以获得相对理想的性能。

虽然我们可以设置 shuffle partition 数量,但是无法给出一个对所有任务来说都是最优的值,因为每个 task 处理的的数据量以及 shuffle 策略也可能不同。

Shuffle partition 太大或太小都会带来问题:

目前 shuffle partition 数量无法根据每个任务动态调整,只能针对不同的任务进行多次的优化调整,才能得到较为合理的值,但是往往作业的数据量是逐日累增的,所以之前优化的值可能不再适合后续的作业。

因此理想情况下,为了获取最佳的性能,Spark 能够实现在作业执行过程中根据数据量大小动态设置合适的 shuffle partition 数量。

总结一下并行度问题带来的挑战:

挑战 2:Join 策略选择问题

针对不同数据量大小的场景,Spark 支持三种 join 策略以获取最佳的性能:

既然 Spark 有三种 join 策略,那么实际会带来哪些挑战:

因此,很多时候,运行的作业可能没有选择最有效的 join 执行策略。

挑战 3:数据倾斜

数据倾斜是指某一个 partition 的数据量远远大于其它 partition 的数据,导致该任务的运行时间远远大于其它任务,因此导致整个 SQL 的运行效率变差。

我们使用的 MapReduce、Spark 和 Flink 都会存在数据倾斜的问题,而且在实际需求开发中(比如使用 join 和 group by 操作),数据倾斜问题也是出现频率比较高的,大部分作业卡在 99% 进度的罪魁祸首。

数据倾斜引起的原因很多,比如:

简单总结一下产生数据倾斜的问题:

在 Spark SQL 实践中,处理数据倾斜的常见手段有:

但是上面这些解决方案都是针对单一任务进行调优,没有一个解决方案可以有效的解决所有的数据倾斜问题。

Spark Adaptive Execution

Spark SQL Execution 介绍

笔者简单说一下,SQL 语句首先通过 Parser 模块被解析为语法树,称为 Unresolved Logical Plan,接着 Unresolved Logical Plan 通过 Analyzer 模块借助于 Catalog 中的表信息解析为 Logical Plan,然后 Optimizer 再通过各种优化策略进行深入优化,得到 Optimized Logical Plan,Planner 模块再将优化后的逻辑计划根据预先设定的映射逻辑转换为 Physical Plan,最后物理执行计划做 RDD 计算,提交 Spark 集群运算,最终向用户返回数据。

Adaptive Execution 想法

基于社区的工作,Intel 大数据技术团队创建了 Adaptive Execution 项目,对 Adaptive Execution 做了重新的设计,实现了一个更为灵活的自适性执行框架,来解决主要的性能问题。

Adaptive Execution 项目的想法是: 

当一个 stage 的 map 任务在 runtime 完成时,我们利用 map 输出大小信息,对并行度、join 策略和倾斜处理进行相应的调整。

Adaptive Execution 框架

并行度优化

通过使用 map 输出大小的信息,我们可以在运行时对并行度进行调整。 

如上图所示,假设我们设置初始 shuffle partition 数量为 8,在 map stage 结束之后,可以看到每一个 Partition(1-8)的大小分别是20M、30M、10M、20M、35M、45M、10M 和 70M。假设设置每一个 reducer 处理的目标数据量(target input size)是 64M,那么在运行时,我们实际使用 4 个 reducer,即第一个 reducer 处理 Partition 1-3,共 60M,第二个 reducer 处理 Partition 4-5,共 55M,第三个 reducer 处理 Partition 6-7,共 55M,第四个 reducer 处理 Partition 8,即 70M。整个作业需要 4 个 task 运行,而不是 8 个 task。

一般情况下,一个 partition 是由一个 task 来处理的。经过优化,我们可以安排一个 task 处理多个 partition,这样,我们就可以保证各个分区相对均衡,不会存在大量数据量很小的 partition。

开启 Adaptive Execution 特性的方式:

配置:

Join 策略优化

通过使用 map 输出大小的信息,我们可以在运行时对 join 策略进行调整。 

在 Shuffle Write 之后,观察两个 Stage 输出的数据量。如果有一个 Stage 数据量明显比较小,可以转换成 Broadcast Hash Join,这样就可以动态的去调整执行计划。

将 Sort Merge Join 转化成 Broadcast Hash Join,此时 join 读取数据是直接从本地读取,没有数据通过网络传输,避开了网络IO的开销,性能会高很多。

开启方式:

配置:

倾斜数据处理

开启自动调整数据倾斜功能后,在作业执行过程中,Spark 会自动找出出现倾斜的 partiiton,然后用多个 task 来处理该 partition,之后再将这些 task 的处理结果进行合并。

开启方式:

性能提升

参考

         

              你若喜欢,点个在看哦

标签:倾斜,SQL,partition,3.0,sql,Spark,spark
来源: https://blog.csdn.net/qq_36039236/article/details/122637524