三.spark优化参数
作者:互联网
spark优化参数
--设置spark shuffle分区数量参考: excutor-cores * 3
set("spark.sql.shuffle.partitions", "36")
--1.broadcastHashJOin
--默认小表小于10M自动进行广播join
set("spark.sql.autoBroadcastJoinThreshold","10m")
--1.可强制使用广播join SQL Hint暗示方式
select /*+ BROADCASTJOIN(sc) */
select /*+ BROADCAST(sc) */
select /*+ MAPJOIN(sc) */
--2.使用API方式
import org.apache.spark.sql.functions._
broadcast(sc)
.join(csc,Seq("courseid"))
.select("courseid")
--2.shufflehashjoin 的hint
select /*+ SHUFFLE_REPLICATE_NL(school) */
--3.SortMergeJoin的hint
select /*+ SHUFFLE_MERGE(school) */
select /*+ MERGEJOIN(school) */
select /*+ MERGE(school) */
--调增小文件的读取,避免大量数据分片、task元数据信息对Driver造成压力
spark.sql.files.maxPartitionBytes=128MB --设置分区大小
spark.files.openCostInBytes=4194304 --设置文件开销大小最好为小文件大小, 默认 4m
--设置map端输出流buffer(改变并不明显)
set("spark.shuffle.file.buffer", "64") --能修改 map端输出流buffer默认为32k
set("spark.shuffle.spill.batchSize", "20000") -- 不可修改 溢写批次条数
set("spark.shuffle.spill.initialMemoryThreshold", "104857600")--不可修改溢写批次大小为5M
--设置reduce端拉取大小以及拉取次数(改变并不明显)
set("spark.reducer.maxSizeInFlight", "96m") -- reduce缓冲区,默认48m
set("spark.shuffle.io.maxRetries", "6") -- 重试次数,默认3次
set("spark.shuffle.io.retryWait", "60s") -- 重试的间隔,默认5s
---本地化级别:进程本地化、节点本地化、机架本地化、any。从左到右按待时间不满足时,依次降级.
set("spark.locality.wait", "6s") --默认3s.全局设置本地化级别等待时间
set("spark.locality.wait.process", "60s") --进程本地化级别等待时间
set("spark.locality.wait.node", "30s") --节点本地化级别等待时间
set("spark.locality.wait.rack", "20s") --机架本地化级别等待时间
--设置节点超时时间,避免GC时间过长,把excutor干掉
--conf spark.core.connection.ack.wait.timeout=300s ##默认120s
--开启使用堆外内存
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=1g
--设置堆外内存开销
--conf spark.executor.memoryOverhead=1G ## max(384,200)
--conf spark.driver.memoryOverhead =1G ## max(384,200)
---------spark3推出了 AQE(Adaptive Query Execution),即自适应查询执行。
--spark动态合并分区
set("spark.sql.autoBroadcastJoinThreshold", "-1") --为演示效果关闭自动join
set("spark.sql.adaptive.enabled", "true") --开启自适应总开关
set("spark.sql.adaptive.coalescePartitions.enabled", "true") --开启合并分区
set("spark.sql.adaptive.coalescePartitions.initialPartitionNum","1000") --开启初始化分区数,默认为分区数
set("spark.sql.adaptive.coalescePartitions.minPartitionNum","10") --合并最小分区数
set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "20mb") --每个合并后分区最大的大小
set("spark.dynamicAllocation.enabled","true") -- 动态申请资源 (不建议开)
set("spark.dynamicAllocation.shuffleTracking.enabled","true") -- shuffle动态跟踪(查看动态申请资源)
--spark动态切换join策略
spark.sql.adaptive.localShuffleReader.enabled=true --当不需要shuffle分区时,尝试使用本地shuffle
--设置数据倾斜的数据均衡
set("spark.sql.adaptive.skewJoin.enable","true") --打开数据倾斜开关
set("spark.sql.adaptive.skewJoin.skewedPartitionFactor","2") --如果数据块 大于中位数*此因子 则为数据倾斜
set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","20mb") --当单个数据块大于此值则数据倾斜(和上面同时满足则认为数据倾斜
set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "8mb") --建议shuffle和并后分区最大的大小
--解决数据倾斜:对于大key
1.distribute by cast(rand() * 5 as int)
2.先对数据进行分组,对key进行加盐聚合,再去盐聚合。
标签:set,shuffle,--,参数,sql,spark,优化,adaptive 来源: https://blog.csdn.net/ShiHao_Li/article/details/122073518