Spark Shuffle和Mapreduce Shuffle
作者:互联网
Spark Shuffle和Mapreduce Shuffle的区别
MR Shuffle
MR shuffle
Spark Shuffle中包括Hash Shuffle(优化和未优化)、sortShuffle、BypassMergeSortShuffle
MR Shuffle包括Map Shuffle和Reduce Shuffle
//MR Shuffle
Map端Shuffle从Map方法之后开始:环形缓冲区刷写、分区排序(分区标记在进入环形缓冲区之前已经打上、排序在刷写之前发生)、combine预聚合、归并排序、压缩
Reduce端Shuffle从Reducer复制数据开始:copy(Reducer从Mapper端拉取数据)、sort(对数据再做合并和归并排序,分组后进入reduce方法)、reduce(reduce用户处理逻辑)
//Spark Shuffle
主要分为write和read两个部分:
RDD中每一个分区就对应一个shuffleMapTask;
每一个shuffleMapTask会为每一个resultTask生成一个bucket;(优化后可共用,通过索引文件的方式来区分不同MapTask对应的数据)
Spark Shuffle
未优化的HashShuffle:
1、上图有2个CPU,可以同时运行两个ShuffleMapTask
2、每个task将写buket缓冲区,缓冲区的数量和reduce任务的数量相等
3、 每个buket缓冲区会生成一个对应ShuffleBlockFile
4、ShuffleMapTask 如何决定数据被写到哪个缓冲区呢?这个就是跟partition算法有关系,这个分区算法可以是hash的,也可以是range的 ;
5、最终产生的ShuffleBlockFile会有多少呢?就是ShuffleMapTask 数量乘以reduce的数量,这个是非常巨大的
优化后的HashShuffle:(consolidation机制)(
在同一核CPU执行先后执行的ShuffleMapTask可以共用bucket缓冲区,然后写到同一份ShuffleFile里去,上图所示的ShuffleFile实际上是用多个ShuffleBlock构成,那么,那么每个worker最终生成的文件数量,变成了cpu核数乘以reduce任务的数量,大大缩减了文件量。
优点:减缓了小文件过多的问题;
sortShuffle:
通过排序建立索引,相比较于hashShuffle,它只有一个临时文件,不管有多少个reduceTask都只有一个临时文件;
在该模式下,数据会先写入一个数据结构,聚合算子写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个task过程会产生多个临时文件。
最后在每个task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个task的数据在文件中的索引,start offset和end offset。
这样算来如果第一个stage 50个task,每个Executor执行一个task,那么无论下游有几个task,就需要50个磁盘文件。(磁盘文件数只与MapTask数目有关、一个MapTask对应一个磁盘文件)
BypassMergeSortShuffle:
//bypass机制运行条件:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
不是聚合类的shuffle算子(比如reduceByKey)。
在这种机制下,当前stage的task会为每个下游的task都创建临时磁盘文件。将数据按照key值进行hash,然后根据hash值,将key写入对应的磁盘文件中(个人觉得这也相当于一次另类的排序,将相同的key放在一起了)。最终,同样会将所有临时文件依次合并成一个磁盘文件,建立索引。
该机制与未优化的hashshuffle相比,没有那么多磁盘文件,下游task的read操作相对性能会更好。
该机制与sortshuffle的普通机制相比,在readtask不多的情况下,首先写的机制是不同,其次不会进行排序。这样就可以节约一部分性能开销。
// 写机制不同,没有排序过程;
//另外这个Sort-Based Shuffle跟Executor核数没有关系,即跟并发度没有关系,它是每一个ShuffleMapTask都会产生一个data文件和index文件,所谓合并也只是将该ShuffleMapTask的各个partition对应的分区文件合并到data文件而已。所以这个就需要个Hash-BasedShuffle的consolidation机制区别开来。(consolidation机制最后文件数是和CPU数和ResultTask数目有关)
Shuffle Read:
Shuffle读由reduce这边发起,它需要先到临时文件中读,一般这个临时文件和reduce不在一台节点上,它需要跨网络去读。但也不排除在一台服务器。不论如何它需要知道临时文件的位置,
这个是谁来告诉它的呢?它有一个BlockManager的类。这里就知道将来是从本地文件中读取,还是需要从远程服务器上读取。
读进来后再做join或者combine的运算。
这些临时文件的位置就记录在Map结构中。
可以这样理解分区partition是RDD存储数据的地方,实际是个逻辑单位,真正要取数据时,它就调用BlockManage去读,它是以数据块的方式来读。
比如一次读取32k还是64k。它不是一条一条读,一条一条读肯定性能低。它读时首先是看本地还是远程,如果是本地就直接读这个文件了,
如果是远程,它就是发起一次socket请求,创建一个socket链接。然后发起一次远程调用,告诉远程的读取程序,读取哪些数据。读到的内容再通过socket传过来。
标签:文件,task,Shuffle,写入,Mapreduce,临时文件,磁盘,Spark 来源: https://blog.csdn.net/JumpZard/article/details/119322618