SparkShuffle机制 - ⽀持⾼效聚合和排序的数据结构
作者:互联网
.
- 一 .前言
- 二 .AppendOnlyMap的原理
- 三 .ExternalAppendOnlyMap
- 四 .PartitionedAppendOnlyMap
- 五 .PartitionedPairBuffer
- 六 . 与Hadoop MapReduce的Shuffle机制对⽐
一 .前言
为了提⾼聚合和排序性能,Spark为Shuffle Write/Read的聚合和排序过程设计了3种数据结构,
这⼏种数据结构的基本思想是在内存中对record进⾏聚合和排序,如果存放不下,则进⾏扩容,如果还存放不下,就将数据排序后spill到磁盘上,最后将磁盘和内存中的数据进⾏聚合、排序,得到最终结果 .
仔细观察Shuffle Write/Read过程,我们会发现Shuffle机制中使⽤的
数据结构的两个特征:
⼀是 : 只需要⽀持record的插⼊和更新操作,不需要⽀持删除操作,这样我们可以对数据结构进⾏优化,减少内存消耗;
⼆是 : 只有内存放不下时才需要spill到磁盘上,因此数据结构设计以内存为主,磁盘为辅。Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现。
二 .AppendOnlyMap的原理
AppendOnlyMap实际上是⼀个只⽀持record添加和对Value进⾏更新的HashMap。与Java HashMap采⽤“数组+链表”实现不同,AppendOnlyMap只使⽤数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发⽣Hash值冲突,则使⽤⼆次地址探测法(Quadratic probing)来解决Hash值冲突。
对于每个新来的<K,V>record,先使⽤Hash(K)计算其存放位置,如果存放位置为空,就把record存放到该位置。
如果该位置已经被占⽤,就使⽤⼆次探测法来找下⼀个空闲位置。如下图,插入数据<K6,V6>record来说,第1次找到的位置Hash(K6)已被K2占⽤。按照⼆次探测法向后递增1个record位置,也就是Hash(K6)+1×2,发现位置已被K3占⽤,然后向后递增4个record位置(指数递增,Hash(K6)+2×2),发现位置没有被占⽤,放进去即可
假设又新来了⼀个<K6,V7>record,需要与刚存放进AppendOnlyMap中的<K6,V6>进⾏聚合,聚合函数为func(),
那么⾸先查找K6所在的位置,查找过程与刚才的插⼊过程类似,经过3次查找取出<K6,V6>record中的V6,进⾏Vʹ=func(V6,V7)运算,最后将Vʹ写⼊V6的位置。
扩容:
AppendOnlyMap使⽤数组来实现的问题是,如果插⼊的record太多,则很快会被填满。
Spark的解决⽅案是,如果AppendOnlyMap的利⽤率达到70%,那么就扩张⼀倍,扩张意味着原来的Hash()失效,因此对所有Key进⾏rehash,重新排列每个Key的位置。
排序:
由于AppendOnlyMap采⽤了数组作为底层存储结构,可以⽀持快速排序等排序算法。实现⽅法,如下图所⽰,先将数组中所有的<K,V>record转移到数组的前端,⽤begin和end来标⽰起始位置,然后调⽤排序算法对[begin,end]中的record进⾏排序。对于需要
按Key进⾏排序的操作,如sortByKey(),可以按照Key值进⾏排序;对于其他操作,只按照Key的Hash值进⾏排序即可。
输出:
迭代AppendOnlyMap数组中的record,从前到后扫描输出即可。
三 .ExternalAppendOnlyMap
AppendOnlyMap的优点是能够将聚合和排序功能很好地结合在⼀起,缺点是只能使⽤内存,难以适⽤于内存空间不⾜的情况。
为了解决这个问题,Spark基于AppendOnlyMap设计实现了基于内存+磁盘的ExternalAppendOnlyMap,⽤于Shuffle Read端⼤规模数据聚合。同时,由于Shuffle Write端聚合需要考虑partitionId,Spark也设计了带有partitionId的ExternalAppendOnlyMap,名为PartitionedAppendOnlyHashMap。
ExternalAppendOnlyMap的⼯作原理是,先持有⼀个AppendOnlyMap来不断接收和聚合新来的record,AppendOnlyMap快被装满时检查⼀下内存剩余空间是否可以扩展,可直接在内存中扩展,不可对AppendOnlyMap中的record进⾏排序,然后将record都spill到磁盘上。因为record不断到来,可能会多次填满AppendOnlyMap,所以这个spill过程可以出现多次,最终形成多个spill⽂件。等record都处理完,此时AppendOnlyMap中可能还留存⼀些聚合后的record,磁盘上也有多个spill⽂件。
因为这些数据都经过了部分聚合,还需要进⾏全局聚合(merge)。因此,ExternalAppendOnlyMap的最后⼀步是将内存中AppendOnlyMap的数据与磁盘上spill⽂件中的数据进⾏全局聚合,得到最终结果。
3.1. 如何获知当前AppendOnlyMap的⼤⼩?因为AppendOnlyMap中不断添加和更新record,其⼤⼩是动态变化的,什么时候会超过内存界限是难以确定的。
虽然我们知道AppendOnlyMap中持有的数组的长度和⼤⼩,但数组⾥⾯存放的是Key和Value的引⽤,并不是它们的实际对象(object)
⼤⼩,⽽且Value会不断被更新,实际⼤⼩不断变化。因此,想准确得到AppendOnlyMap的⼤⼩⽐较困难。⼀种简单的解决⽅法是在每次插⼊record或对现有record的Value进⾏更新后,都扫描⼀下AppendOnlyMap中存放的record,计算每个record的实际对象⼤⼩并相加,但这样会⾮常耗时。⽽且⼀般AppendOnlyMap会插⼊⼏万甚⾄⼏百万个record,如果每个record进⼊AppendOnlyMap都计算⼀遍,则开销会很⼤。
Spark设计了⼀个增量式的⾼效估算算法,在每个record插⼊或更新时根据历史统计值和当前变化量直接估算当前AppendOnlyMap的⼤⼩,算法的复杂度是O(1),开销很⼩。在record插⼊和聚合过程中会定期对当前AppendOnlyMap中的record进⾏抽样,然后精确计算这些record的总⼤⼩、总个数、更新个数及平均值等,并作为历史统计值。进⾏抽样是因为AppendOnlyMap中的record可能有上万个,难以对每个都精确计算。之后,每当有record插⼊或更新时,会根据历史统计值和历史平均的变化值,增量估算AppendOnlyMap的总⼩,详见Spark源码中的SizeTracker.estimateSize()⽅法。抽样也会定期进⾏,更新统计值以获得更⾼的精度。
3.2. 如何设计spill的⽂件结构,使得可以⽀持⾼效的全局聚合?
当AppendOnlyMap达到内存限制时,会将record排序后写⼊磁盘中。
排序是为了⽅便下⼀步全局聚合(聚合内存和磁盘上的record)时可以采⽤更⾼效的merge-sort(外部排序+聚合)。
那么,问题是根据什么对record进⾏排序的?
⾃然想到的是根据record的Key进⾏排序的,但是这就要求操作定义Key的排序⽅法,如sortByKey()等操作定义了按照Key进⾏的排序。⼤部分操作,如groupByKey(),并没有定义Key的排序⽅法,也不需要输出结果是按照Key进⾏排序的。在这种情况下,Spark采⽤按照Key的Hash值进⾏排序的⽅法,这样既可以进⾏merge-sort,又不要求操作定义Key排序的⽅法。然⽽,这种⽅法的问题是会出现Hash值冲突,也就是不同的Key具有相同的Hash值。为了解决这个问题,Spark在merge-sort的同时会⽐较Key的Hash值是否相等,以及Key的实际值是否相等。
解决了spill时如何对record进⾏排序的问题后,每当AppendOnlyMap超过内存限制,就会将其内部的record排序后spill到磁盘上,如下图所⽰,AppendOnlyMap被填满了4次,也被spill到磁盘上4次。
3.3. 怎样进⾏全局聚合?
由于最终的spill⽂件和内存中的AppendOnlyMap都是经过部分聚合后的结果,其中可能存在相同Key的record,因此还需要
⼀个全局聚合阶段将AppendOnlyMap中的record与spill⽂件中的record进⾏聚合,得到最终聚合后的结果。
全局聚合的⽅法就是建⽴⼀个最⼩堆或最⼤堆,每次从各个spill⽂件中读取前⼏个具有相同Key(或者相同Key的Hash值)的record,然后与AppendOnlyMap中的record进⾏聚合,并输出聚合后的结果。 在下图中,在全局聚合时,Spark分别从4个spill⽂件中提取第1个<K,V>record,与还留在AppendOnlyMap中的第1个record组成最⼩堆,然后不断从最⼩堆中提取具有相同Key的record进⾏聚合(merge)。然后,Spark继续读取spill⽂件及AppendOnlyMap中的record填充最⼩堆,直到所有record处理完成。由于每个spill⽂件中的record是经过排序的,按顺序读取和聚合可以保证能够对每个record得到全局聚合的结果。
ExternalAppendOnlyMap是⼀个⾼性能的HashMap,只⽀持数据插⼊和更新,但可以同时利⽤内存和磁盘对⼤规模数据进⾏聚合
和排序,满⾜了Shuffle Read阶段数据聚合、排序的需求。
四 .PartitionedAppendOnlyMap
PartitionedAppendOnlyMap⽤于在Shuffle Write端对record进⾏聚合(combine)。PartitionedAppendOnlyMap的功能和实现与ExternalAppendOnlyMap的功能和实现基本⼀样,唯⼀区别是PartitionedAppendOnlyMap中的Key是“PartitionId+Key”,这样既可以根据partitionId进⾏排序(⾯向不需要按Key进⾏排序的操作),也可以根据partitionId+Key进⾏排序(⾯向需要按Key进⾏排序的操作),从⽽在Shuffle Write阶段可以进⾏聚合、排序和分区。
五 .PartitionedPairBuffer
PartitionedPairBuffer本质上是⼀个基于内存+磁盘的Array,随着数据添加,不断地扩容,当到达内存限制时,就将Array中的数据按照partitionId或partitionId+Key进⾏排序,然后spill到磁盘上,该过程可以进⾏多次,最后对内存中和磁盘上的数据进⾏全局排序,输出或者提供给下⼀个操作。
六 . 与Hadoop MapReduce的Shuffle机制对⽐
6.1. Hadoop MapReduce原理
Hadoop MapReduce有明显的两个阶段,即map stage和reduce stage。
如下图所⽰,在map stage中,每个map task⾸先执⾏map(K,V)函数,再读取每个record,并输出新的<K,V>record。
这些record⾸先被输出到⼀个固定⼤⼩的spill buffer⾥(⼀般为100MB),spill buffer如果被填满就将spill buffer中的record按照Key排序后输出到磁盘上。这个过程类似Spark将map task输出的record放到⼀个排序数组(PartitionedPairBuffer)中,不同的是Hadoop MapReduce是严格按照Key进⾏排序的,⽽PartitionedPairBuffer排序更灵活(可以按照partitionId进⾏排序,也可以按照partitionId+Key进⾏排序)。
另外,由于spill buffer中的record只进⾏排序,不能完成聚合(combine)功能,所以Hadoop MapReduce在完成map()、等待所有的record都spill到磁盘上后,启动⼀个专门的聚合阶段(下图中的merge phase),使⽤combine()将所有spill⽂件中的record进⾏全局聚合,得到最终聚合结果。注意,这⾥需要进⾏多次全局聚合,因为每次只针对某个分区的spill⽂件进⾏聚合。
在Shuffle Read阶段,Hadoop MapReduce先将每个map task输出的相应分区⽂件通过⽹络获取,然后放⼊内存,如果内存放不下,就先
对当前内存中的record进⾏聚合和排序,再spill到磁盘上,下图中的a,b,c,d,…代表从不同map task获取的分区⽂件,每个⽂件⾥⾯包含许多个record。由于每个分区⽂件中包含的record已经按Key进⾏了排序,聚合时只需要⼀个最⼩堆或者最⼤堆保存当前每个⽂件中的前⼏个record即可,聚合效率⽐较⾼,但需要占⽤⼤量内存空间来存储这些分区⽂件。等获取所有的分区⽂件时,此时可能存在多个spill⽂件及内存中剩余的分区⽂件,这时再启动⼀个专门的reduce阶段(下图中的reduce phase)来将这些内存和磁盘上的数据进⾏全局聚合,这个过程与Spark的全局聚合过程没有什么区别,最后得到聚合后的结果。
6.2. Hadoop MapReduce的Shuffle机制的优点
①Hadoop MapReduce的Shuffle流程固定,阶段分明,每个阶段读取什么数据、进⾏什么操作、输出什么数据都是确定性的。这种确定性使得实现起来⽐较容易。
②Hadoop MapReduce框架的内存消耗也是确定的,map阶段框架只需要⼀个⼤的spill buffer,reduce阶段框架只需要⼀个⼤的数组(MergeQueue)来存放获取的分区⽂件中的record。这样,什么时候将数据spill到磁盘上是确定的,也易于实现和内存管理。当然,⽤户定义的聚合函数,如combine()和reduce()的内存消耗是不确定的。
③Hadoop MapReduce对Key进⾏了严格排序,使得可以使⽤最⼩堆或最⼤堆进⾏聚合,⾮常⾼效。⽽且可以原⽣⽀持sortByKey()。
④Hadoop MapReduce按Key进⾏排序和spill到磁盘上的功能,可以在Shuffle⼤规模数据时仍然保证能够顺利进⾏。
6.3. Hadoop MapReduce的Shuffle机制的缺点
①Hadoop MapReduce强制按Key进⾏排序,⼤多数应⽤其实不需要严格地按照Key进⾏排序,如groupByKey(),排序增加计算量。
②Hadoop MapReduce不能在线聚合,不管是map()端还是reduce()端,都是先将数据存放到内存或者磁盘上后,再执⾏聚合操作的,存储这些数据需要消耗⼤量的内存和磁盘空间。如果能够⼀边获取record⼀边聚合,那么对于⼤多数聚合操作,可以有效地减少存储空间,并减少时延。
③Hadoop MapReduce产⽣的临时⽂件过多,如果map task个数为M,reduce task个数为N,那么map阶段集群会产⽣M×N个
分区⽂件,当M和N较⼤时,总的临时⽂件个数过多。
克服第1个缺点(强制排序)的⽅法是对操作类型进⾏分类,如Spark提供了按partitionId排序、按Key排序等多种⽅式来灵活应对不同操
作的排序需求。
克服第2个缺点(不能在线聚合)的⽅法是采⽤hash-based聚合,也就是利⽤HashMap的在线聚合特性,将record插⼊HashMap时⾃动完成聚合过程,即Spark为什么设计AppendOnlyMap等数据结构。
克服第3个缺点(临时⽂件问题)的⽅法是将多个分区⽂件合并为⼀个⽂件,按照partitionId的顺序存储,这也是Spark为什么要按照partitionId进⾏排序的原因。
总的来说,Spark采⽤的是hash+sort-based Shuffle的⽅法,融合了hash-based和sort-based Shuffle的优点,根据不同操作的需求,灵活选择最合适的Shuffle⽅法。
由于Hadoop MapReduce采⽤独⽴阶段聚合,⽽Spark采⽤在线聚合的⽅法,两者的聚合函数还有⼀个⼤的区别。MapReduce的聚合函数reduce()接收的是⼀个<K,list(V)>record,可以对每个record中的list(V)进⾏任意处理,⽽Spark中的聚合函数每当接收到⼀个<K,V>record时,就要⽴即进⾏处理,在流程上有⼀些受限。
在聚合过程中Spark需要对每个到来的record进⾏⽴即处理,⽽Hadoop MapReduce没有这个要求,所以更加灵活.
标签:AppendOnlyMap,排序,聚合,spill,record,Key,数据结构,SparkShuffle 来源: https://blog.csdn.net/zhanglong_4444/article/details/118409510