mapreduceShuffle过程解析
作者:互联网
mapreduceShuffle过程解析
Shuffle过程是Mapreduce的核心,要想理解Mapreduce,Shuffle过程是必须了解的。
Shuffle的本义是洗牌,混乱,把一组有规则的数据转换成一组无规则的数据,Mapreduce中的shuffle更像是洗牌的逆过程,把map端处理完的数据规约到reduce端,更像是一个整合的过程,把相对无规则的数据转换成有规则的数据。
为什么Mapreduce计算模型需要shuffle过程?
因为Mapreduce一般包括两个重要的阶段:Map是映射,负责数据的过滤分发,
Reduce是归约,负责数据的计算归并,Map的输出即是Reduce的输入。Reduce需要shuffle来获取数据,保证接收到数据是完整有序的,在分布式系统中,map和Reduce基本分布在不同的节点,需要shuffle来协调数据的传输,shuffle会合理合并数据以减少网络IO,提高计算效率。
Shuffle流程解析
Shuffle在Mapreduce中是一个贯穿map端和reduce端的一个过程,总体分为两个阶段:Map端和Reduce端,我画了一张容易理解的流程图。
Map:
Map端包括以下四个步骤:
1、split过程:
首先对输入文件进行切分,当前版本默认切分规则是一个split切片对应hdfs中的块大小,一个maptask处理一个切片,经过重写过的map方法之后进入下一个阶段,关于切分规则看我另外一篇博客。
2、partition过程:
分区partition是map方法结束后经历的阶段,partition的作用是分割map端的最后结果,分割后的每一部分会被不同的reducetask获取,具体过程是:partition会给每一个map端context写出的kv键值对打一个标记,代表这条数据应该由哪一个reduce处理(有关reduce和partition之间的数对应关系看另一篇博客),之后partition处理完的数据会写入环形缓冲区。partition是一个可以自定义的组件,可以继承partitioner类,重写getpartition方法,实现自定义partition。
3、(spill)溢写过程
partition之后数据会写入到一个环形缓冲区之内,实现就是一个字节数组,叫做Kvbuffer,你可以抽象的理解成是一个环形缓冲区,类似上图左下角那一部分,使用环形数据结构是为了更有效的使用内存空间,缓冲区中不光放了数据,还放了一些索引数据,数据区域(<k,v>)和索引区域(kvmeta)是两个相邻不重叠的两个区域,意思就是这个环形缓冲区就好比一个首尾相连的纸条,一开始这个纸条有一个分界线,当有数据写入的时候,数据会从这个分界线开始往左边写,而这条数据的索引数据从分界线开始往右边写,之后每次spill之后,分界点都会变化,调到未存储区域的中间继续往两边写数据。数据的存放指针(不是另一边索引数据)值bufindex一直向上增长,比如一开始临界点为0,写了一个Int型的key,bufindex就变为4,写了一个Int型的value,bufindex就变为8;临界点另一边的索引数据也有一个指针,一条kv的索引数据是一个四元组,包括value的起始位置,key的起始位置,partition值,value的长度,存一次指针kvindex就向下跳4格。
缓冲区当前版本默认100M大小,可以通过参数设置,满了怎么办,这里有一个机制,存在一个阈值,默认80%,通过参数也可以设置,当缓冲区数据到了80%,溢写线程就会锁定这80%的数据,开始spill(溢写),这样溢写线程可以在map写缓冲区的同时,把数据溢写到磁盘上,如果往进写的比溢写快,缓冲区就会满,这时写入会关闭,等待溢写执行完,才会继续往缓冲区写数据,溢写过程首先会对数据sort,会把数据先根据partition值聚集在一起,每个分区内根据key快速排序,排序只会修改kvmeta区域的值。
值得一提的是:如果定义了combiner,这时就会执行,combiner是MR中的一个组件,可以理解为非正式的reducer,定义的代码一般和reducer一毛一样,所以!combiner的输出就是reducer的输入,combiner和reducer既然一样,所以combiner适用于那中reducer的输入和输出类型一致,不影响最终结果的情况,比如累加,最大值,combiner要慎重使用,用的好会提高计算效率,反之会影响最终结果。
每次溢写spill线程会在本地创建一个溢写文件,格式如spill12.out,spill线程根据排过序的kvmeta挨个partition的数据吐到溢写文件中,虽然是顺序存放的,那怎么知道某个partition在文件中的起始位置呢,强大的索引又出场了,有一个三元组记录这个partition对应的数据在文件中的索引,包括:起始位置,原始数据长度,压缩后的数据长度,一个partition对应一个三元组,,然后把这些索引数据放在内存中,(存放区域默认大小1M),如果内存放不下,后续索引信息就会写到磁盘,格式如spill12.out.index,文件中不光存放了索引数据,还存了crc32校验数据 ,每次溢写之后,缓冲区的临界点就会跳到剩余空间的中间位置,继续往两边写。
maptask整体的数据流向就是map方法->partition->环形缓冲区->磁盘->合并最终文件。最后内存中的所有数据都会溢写到磁盘中,所以至少也会生成一个溢写文件,之后进入最后的合并阶段。
4、Merge过程
每次溢写都会生成一个溢写文件,maptask最后会生成一个最终结果文件,合并这些溢写文件生成最终文件的过程就是merge过程,merge过程会扫描磁盘所有溢写文件和索引文件,把每个溢写文件的地址放到一个数组列表,索引数据同样,merge过程会创建file.out和file.out.index文件来存储最终结果,
一个partition一个partition的进行合并输出,对于每个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中,也就是这个partition对应一个段列表,,记录所有溢写文件中对应partition那段数据的文件名,起始位置,长度等等。
然后对这个partition对应的所有segment进行合并,目标是合并成一个segment,当这个partition对应多个segment时,会分批进行合并,先从segment列表把第一批取出来,以key为关键字放置成最小堆,然后从最小堆每次取出最小的输出到零时文件,这批段就合并成一个临时的段,放回到segment列表中,然后再取出一批,往复执行,剩下最后一批,输出到最终文件中,索引数据输出到索引文件中,(这里的排序是归并排序)然后合并其他partition数据。
当溢写文件达到3个的时候,combiner会再次运行。
Map端的Shuffle过程到此结束
reduce:
reduce端的过程没那么复杂,大致分为copy阶段和merge阶段。
每个map节点都会启动一个常驻的HTTPserver, 其中一项服务就是相应reduce拖取Map数据,,当有MapOutput的HTTP请求过来的时候,HTTP server就会读取相应的Map输出文件对应这个Reduce部分的数据通过网络输出给这个Reduce。
Reduce是怎么知道什么时候开始拖取数据?
map生成最终文件会汇报给applicationmaster,Reduce会从applicationmaster获取哪些map完成工作,然后去拉取数据。
reduce默认开启5个线程同时从各个map端拉取数据,merge的三种形式:
1,内存到内存,默认不开启
2,内存到磁盘,数据能放到内存就放进内存,内存不够直接写到磁盘,写内存时也是到达阈值开始内存到磁盘的merge,和map端的溢写类似,生成许多文件,直到没有从拉取到map端没有数据,开启第3种:磁盘到磁盘的合并,最后合并阶段也是归并排序。
reducetask的Shuffle阶段几乎就是大量的合并,合并各个map端属于自己的数据,归整到一起,生成最终文件输入到reduce。
Shuffle过程到此结束。
一些容错细节没写,有不懂的,一起讨论,共同学习。
标签:map,mapreduceShuffle,过程,partition,索引,文件,解析,数据,溢写 来源: https://blog.csdn.net/f_n_c_k/article/details/88016696