MapReduce底层原理和执行流程经典讲解【小二讲堂】
作者:互联网
本片博客是小二精心所得,评论席可与小二探讨!!!
小二讲堂:https://blog.csdn.net/Mirror_w
MapReduce的原语“相同的key为一组,调用一次reduce,方法内迭代这组数据并进行计算”
一、MapReduce
MapReduce可以集群中可靠地、容错地、并行处理、TB级别的数据。
mapreduce分布式计算框架,分为map阶段和reduce阶段,map端又分为数据的切片和shuffle两个阶段,reduce又分为shuffle和reduce计算两个方面。最后reduce又将数据输出到hdfs中进行保存。而在mapreduce的过程中map的输出则是renduce的输入。map端的输出类型和redcue端的输入类型一致。
二、MapReduce作业流程分析
-
工作流程简要分析:
1.MapReduce将从HDFS输入数据进行切片(逻辑切片,按一定量的偏移量offset读取),
2、Map任务以并行的方式处理切片数据。
3、框架对map的输出进行排序,然后将数据进行写入缓冲区,等待reduce任务来获取数据,4.MapReduce的输入输出存在于同一HDFS分布式文件系统中。
5、框架负责任务的调度任务的监控和失败任务的执行 .
当map端从hdfs中获取到数据后,首先对数据通过一定的偏移量进行切分(每个切片默认128M),每一个切片对应着一个maptask任务,然后每个map任务进行sort分区排序(此处的分区排序下面详述),将key值相同的切片为一组,进行排序并且将排序后的数据文件发送到一个环形缓冲区中,环形缓冲区中的数据超过80%时会溢写到磁盘上,其他节点上的数据执行流程也是如此,但map任务执行到80%时,reduce任务会启动进行执行,reduce会启动默认5个线程进行任务的执行,对于map中 key相同的会调用一个reduce去执行,reduce将map端执行完毕的map任务通过网络拉取时,如果map端的数据过大,会使用combiner合并的方式通过网络进行数据传输,combiner和并过大上的数据文件时会减少大量的IO操作,提高很高的效率。如果map端没有执行完毕,reduce端拉取到一部分数据,那么redcuce端会将拉取到的这一部分数据进行merge合并操作,当map端的task任务全部执行完毕的时候,reduce会将所有的数据进行全量合并,然后调用reduce计算方法进行逻辑计算,并将计算的结果发送到hdfs上进行保存。
分步执行详细解析:
-
a.在进行数据计算时,首先通过DistributeInputStream对象进行获取数据
-
b.然后对数据块进行通过一定的偏移量进行切片,切片大小为默认128KB,然后每个切片对应着一个map集合,对于对单词的计数来说,map集合中的key是每个偏移量的数据,而value是默认是1
-
c.然后对map进行快速排序,其他工作的节点上的工作流程也是这样的 ,这样通过排序将key值相同的数据放置到了一起
-
d.然后reduce判断如果map执行的任务完成了80%则reduce就开始执行任务,通过http的方式从已经排好序的数据集中去获取数据.获取数据的依据:MapReduce原语:“相同”key值的键值为一组,调用一次Reduce的方法,在这个方法内进行迭代计算这一组数据。进行获取数据集时,将相同key值的map结合中的数据进行获取(包括获取其他并行处理数据的节点上的数据),如果map任务找那个还有未执行完毕的任务,则reduce会先进行实时计算,进行归并排序,等待其他执行map节点上的任务完毕后,进行reduce的计算,reduce在进行计算时,将相同的key值相同的进行计算统计(数单词),然后将计算完毕的数据结果进行输出到HDFS进行储存。
-
Reducce是通过http按照分区号获取map输出文件的数据。map端有一个http服务处理该reducer的HTTP请求。该HTTP服务最大线程数有maperduce。shuffle.Max.threads属性指定。这个属性指定nodemanager的线程数,而不是对map任务的线程数,因为nodemanager上有可能运行着好多个任务,默认值是0,表示最大线程数是核心线程数的两倍。
-
map输出文件是位于本地磁盘的,一个reduce任务需要从集群中多个map任务获取指定该map任务的分区数据。多个map任务有可能是在不同时间完成的。每当map任务完,reduce就会从该map任务中获取指定分区的数据,为了高效率,reduce会以多线程的方式获取指定分区的数据。默认线程数是5,可以通过配置文件进行指定。
文件的溢写:当reduce从map任务完成去拷贝制定分区的文件时,如果内存缓冲区大大小达到了阈值(mapreduce.reduce.shuffle.merge.percent),或者map输出文件达到阈值,(mapreduce.reduce.merge.inmem.threshold)这时文件就会溢写在磁盘上。如果指定combiner合并,此处也会进行combiner -
当reduce从所有的map拷贝了分区数据之后,reduce进入了合并阶段,那么合并多少次就是个问题,如果有50个文件,合并因子为10(mapreduce.task.io.sort.factor,默认是10),则需要5轮得到5个中间文件就不再合并直接输出。
给reduce的数据一般是内存和磁盘混合输出的形式。
三、MapReduce的shuffle
1.partititon分区
当map端进行数据切分后每个切片对应着一个map。然后根据map的个数%reduce的个数,这样所有节点上的数据都会被分区,例如取模之后为0分区,1分区,2分区,reduce会在进行网络调用的时候会将对应分区上的数据调用一个reduce来进行执行,这样将大量的map任务平均到reducer任务上提高了很高的工作效率
2.combiner压缩合并
当map端的数据较大的时候,这里在reduce和map端会产生大量的网络IO操作,这样会影响mr的工作效率,为了避免这种情况,设置combiner对数据进行合并压缩这里的合并压缩不是随便的进行压缩,combiner是针对key的压缩,将map端key值相同数据进行压缩,即对分区的数据进行压缩,当map端数据过大时,经过分区后,分区内的数据过大,这时可以利用combiner对这些数据进行压缩,这样大大减少了网络IO的操作,比如有三个分区0分区,1分区,2分区,每个分区里有100个小文件,三个分区对应的reduce有reduce0,redcuce1,reduce2,reduce0调用0分区的数据就会经过网络传输100次,而经过combiner对0分区的数据合并若情况允许(合并压缩后文件不大即可),则将数据压缩成一份数据,这样reduce1对0分区的数据只进行一次拉取即可完成,这样这里的一次IO量针对前面的100次IO,减少了很多。------这里的combiner是由条件的当溢写的文件小于3个时不会进行combine。大于等于3是才会进行combiner
3.MapReduce的输入输出模型图示
图示讲解MR:
shuffle中的sort和combiner–源码分析
源码分析:https://blog.csdn.net/Mirror_w/article/details/89420501
当环形缓冲区中的数据达到80%阈值之后,会向磁盘进行溢写,但是在溢写之前会先进行sort排序,这里的排序是快速排序(面试题:快速排序),之后会进行归并排序。
快速排序:
对环形缓冲区中的数据进行创建一个对应的索引,排序的时候对这些索引进行排序–采用快速排序
合并:
在数据向磁盘溢写的时候,如果数据想磁盘溢写小文件个数小于3,则不会这两个文件进行合并。当溢写的小文件个数大于3是,会对这些小文件进行合并。减少reduce端拉取时的IO量。
小二建议读者在看本篇博文后阅读MapReduce源码分析
MapReduce源码分析:https://blog.csdn.net/Mirror_w/article/details/89420501
小二精讲:https://blog.csdn.net/Mirror_w/article/details/89301229
标签:map,分区,reduce,MapReduce,任务,讲堂,讲解,数据,进行 来源: https://blog.csdn.net/Mirror_w/article/details/89421705