其他分享
首页 > 其他分享> > PySpark之Spark的内核调度

PySpark之Spark的内核调度

作者:互联网

一、RDD依赖

一、为什么要设计宽窄依赖

二、窄依赖

三、Shuffle依赖(宽依赖)

四、如何区分宽窄依赖

区分RDD之间的依赖为宽依赖还是窄依赖,主要在于父RDD分区数据与子RDD分区数据关系

二、DAG和Stage

一、什么是DAG

在这里插入图片描述

二、DAG如何划分Stage?

三、Spark Shuffle

一、Spark的Shuffle简介

四、HashShuffle详解

一、Shuffle阶段划分:

二、未经优化的hashShuffleManager:

在这里插入图片描述

三、经过优化的hashShuffleManager:

四、数量对比

五、SortShuffleManager详解

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

在这里插入图片描述

一、SortShuffle的普通机制

二、Sort shuffle的bypass机制

在这里插入图片描述

三、总结

六、Spark Shuffle的配置选项(配置调优)

一、spark 的shuffle调优

主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等

二、spark.shuffle.file.buffer

三、spark.reducer.maxSizeInFlight:

四、spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait:

五、spark.shuffle.memoryFraction:

六、spark.shuffle.manager

七、spark.shuffle.sort.bypassMergeThreshold

七、job调度流程

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

八、Spark并行度

一、资源并行度与数据并行度

二、设置Task数量

将Task数量设置成与Application总CPU Core 数量相同(理想情况,150个core,分配150 Task)官方推荐,Task数量,设置成Application总CPU Core数量的2~3倍(150个cpu core,设置task数量为300~500)与理想情况不同的是:有些Task会运行快一点,比如50s就完了,有些Task可能会慢一点,要一分半才运行完,所以如果你的Task数量,刚好设置的跟CPU Core数量相同,也可能会导致资源的浪费,比如150 Task,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个CPU Core空闲出来了,导致浪费。如果设置2~3倍,那么一个Task运行完以后,另外一个Task马上补上来,尽量让CPU Core不要空闲。

三、设置Application的并行度

参数spark.defalut.parallelism默认是没有值的,如果设置了值,是在shuffle的过程才会起作用

在这里插入图片描述

if __name__ == '__main__':
    print('PySpark First Program')
    # 输入数据
    data = ["hello", "world", "hello", "world"]
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    conf.set("spark.defalut.parallelism", 4)
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    # sc = SparkContext.getOrCreate(conf)
    sc = SparkContext(conf=conf)
    # 将collection的data转为spark中的rdd并进行操作
    rdd = sc.parallelize(data)
    # rdd = sc.textFile("file:///export/pyfolder1/pyspark-chapter02_3.8/data/word.txt") \
    #    .flatMap(lambda line: line.split(" "))
    print("rdd numpartitions:", rdd.getNumPartitions())
    # 执行map转化操作以及reduceByKey的聚合操作
    res_rdd = rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    # 并行度决定了可以同时处理多少个分区
    print("shuffle numpartitions:", res_rdd.getNumPartitions())
    print('停止 PySpark SparkSession 对象')
    sc.stop()

九、Spark中的CombineByKey

combineByKey是Spark中一个比较核心的高级且底层函数,其他一些高阶键值对函数底层都是用它实现的。诸如 groupByKey,reduceByKey等等

如下解释下3个重要的函数参数:

案例一:实现将相同Key的Value进行合并,使用groupBy很容易实现

# -*- coding: utf-8 -*-
# Program function:外部集合转为RDD

from pyspark import SparkConf, SparkContext
import re

# 1-准备环境
conf = SparkConf().setAppName("collection").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
    return [a]

def append(a, b):
    a.append(b)
    return a

def extend(a, b):
    a.extend(b)
    return a


print(sorted(x.combineByKey(to_list, append, extend).collect()))
#[('a', [1, 2]), ('b', [1])]

案例二:求平均分的案例代码

# -*- coding: utf-8 -*-
# Program function:外部集合转为RDD

from pyspark import SparkConf, SparkContext
import re

# 1-准备环境
conf = SparkConf().setAppName("collection").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)])


# (v)=>(v,1),得到的是(88,1),因为这是combineByKey是按照key处理value操作,
# acc:(Int,Int)代表的是(88,1),其中acc._1代表的是88,acc._2代表1值,v代表是同为Fred名称的95的数值,
# 所以acc._1+v=88+95,即相同Key的Value相加结果,第三个参数是分区间的相同key的value进行累加,
# 得到Fred的88+95+91,Wilma累加和为93+95+98。
def createCombiner(a):
    return [a, 1]
def mergeValue(a, b):
    return [a[0] + b, a[1] + 1]
def mergeCombiners(a, b):
    return [a[0] + b[0], a[1] + b[1]]
resultKey = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(resultKey.collect()))
# [('Fred', [274, 3]), ('Wilma', [286, 3])]
print(resultKey.map(lambda score: (score[0], int(score[1][0]) / int(score[1][1]))).collect())
# [('Fred', 91.33333333333333), ('Wilma', 95.33333333333333)]
#lambda表达式版本
resultKey = x.combineByKey(lambda x:[x,1], lambda x,y:[x[0]+y,x[1]+1], lambda x,y:[x[0]+y[0],x[1]+y[1]])
print(sorted(resultKey.collect()))

标签:task,shuffle,Shuffle,PySpark,RDD,内核,磁盘,Spark,Stage
来源: https://blog.csdn.net/feizuiku0116/article/details/122839342