MapReduce工作流程
作者:互联网
1.MapReduce流程图
2.MapReduce的详解
1 .
执行Driver的main方法,里面有个job.waitForCompletion(),在方法里面完成
任务的准备,主要包括数据的切片,并将切片规划写到job.split文件里面,生成运行任务
时的配置文件job.xml,将我们写的mapreducexhen程序打成jar包,准备好之后,
将任务提交给yarn的resourceManager。
Resourcemanager收到客户端提交的任务后,会选择一个nodemanager,
在其上边生成一个mapreduce application master进程,
由该进程负责指挥调度刚刚提交的任务
mapreduce application master 会分析我们的任务,查看切片数量,然后
启动相应数量的Maptask ,如果有两个切片就会启动两个Maptask
mapreduce application master 会分析我们的任务,查看切片数量,然后
启动相应数量的Maptask ,如果有两个切片就会启动两个Maptask
在map方法中对输入的两个参数k,v进行完自己的逻辑处理后,要把我们的结果也要
封装好k,v的形式,写出到outputCollector即缓冲区。
key-value键值对从map中写出后,在进入到环形缓冲区之前,会经过分区器,
得到一个所属分区的区号标记。意味着在进入到环形缓冲区之前,他们就
注定分到哪个区了。
7.1环形缓冲区是个位于内存的数据结构,默认大小是100M,(在hadoop的配置文件中通过io.sort.mb参数修改)
7.2当来到环形缓冲区的数据越来越多,达到容量80%的时候, 这些环形缓冲区的kv对
会被溢写到磁盘上,但是并不是把所有的kv溢写到一个文件,而是根据每个kv的所属分区, 溢写到多个分区文件.
7.3如果在溢写后,环形缓冲区的数据又要满了,还会溢写多次.
7.4另外每一次溢写的时候,会读所有的kv根据mapper的keyout排序后溢出,
这种溢写我们称之为区内有序,也就是溢写后的kv数据既要分区也要排序。
等所有的kv都溢写完毕后,会形成多个分区小文件,还要对这些分区小文件进行
归并排序,所谓归并排序就是把多个有序的小文件变为一个有序的大文件.
这个MapTask执行完成后,会形成多个分区文件,每个文件都是排好序的
MapTask执行完毕后,接下来开始执行ReduceTask,因为MapTask形成了两个分区文件,
所以需要两个ReduceTask,ReduceTask 1先把两个MapTask机器上的分区1数据
下载下来,如果内存放不下的话,把数据放到磁盘上.
然后对下载下来的多个分区1的数据进行归并排序,排好序之后,调用分组比较器
将所有kv按照k进行分组,将分好组的kv去调用一次reduce方法。
3.MapTask工作机制
1.maptask并行度决定机制
maptask的并行度,决定map阶段的任务处理并发度,进而影响到整个job的处理速度
那么,mapTask并行任务是否越多越好呢?
一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定
也不是切的片的越多,形成的maptask越多,并行度越高就越好。首先hdfs上数据
存储切块是按照128M每块的,如果这里的数据切片不按照切块标准,会造成数据跨网
络传输,进而可能会导致资源更大的耗费
下图很好的进行了展示跨网络传输的弊端:
2.MapTask工作机制
1.
Read阶段:
MapTask通过编写的RecordReader,从输入InputSplit中解析出一个个key/value
Map阶段:
该节点主要是将解析出的key/value交给编写map()方法处理
并产生一系列新的key/value
Collect收集阶段:
在编写map()方法中 ,一般会调用context.write(底层OutputCollector.collect()输出结果)
在该方法内部,它会将生成的key/value分区(调用Partitioner)并写入一个环形内存缓冲区中。
Spill阶段:
4.1 即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,
生成一个临时文件.
4.2 需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序
并在必要时对数据进行合并等操作
Combine阶段:
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
5.1 在进行文件合并过程中,MapTask以分区为单位进行合并,对于某个分区,
它将采用多轮递归合并的方式,每轮合并io.sort.factor(默认100)个文件
5.2 并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,
直到最终得到一个大文件
4、ReduceTask工作机制
1.ReduceTask并行度决定机制
ReduceTask的并行度,也就是同时开启几个ReduceTask.
我们知道切片的数量决定了MapTask的数量,那么什么决定了ReduceTask的数量呢?
第一种情况:
如果我们自己定义了分区器,我们能够确定自己的分区器能够形成几个物理分区
假如要生成5个分区
那么要在driver中显式的设置启动与分区数量相等的reduce 的数量
//默认值是1,手动设置为5
job.setNumReduceTasks(5);
第二种情况:
如果我们采用默认的分区器,也就是HashPartitioner,只需要在Driver中
根据实际情况设置若干个ReduceTask的数量即可.
默认的分区器非常的智能,它能够根据我们设置的ReduceTask的数量产生相应数量的分区数量
注意!!!
! ReduceTask=0 ,表示没有reduce阶段,输出文件个数和map个数一致
! ReduceTask默认值就是1,所以输出文件个数为一个
! 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
! ReduceTask数量并不是任意设置,还要考虑业务逻辑需求
! 具体多少个ReduceTask,需要根据集群性能而定
如果分区数不是1,但是ReduceTask为1,是否执行分区过程?
答案:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断
reduceNum个数是否大于1。不大于1肯定不执行。
那么ReduceTask的数量越多越好吗?
ReduceTask的数量并非越多越好,如果设置大量的ReduceTask,
有可能会导致启动这些ReduceTask的时间过长
2.ReduceTask工作机制
1.
Copy阶段:
ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,
如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
Merge阶段:
在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件
进行合并.以防止内存使用过多或磁盘上文件过多.
Sort阶段:
编写reduce()方法输入数据是按key进行聚集的一组数据,为了将key相同的数据
聚在一起 ,由于各个MapTask已经实现对自己的处理结果进行了局部排序。
因此,ReduceTask只需对所有数据进行一次归并排序即可。
Reduce阶段:
reduce()方法将计算结果写到HDFS上。
标签:文件,流程,ReduceTask,MapReduce,MapTask,工作,缓冲区,数据,分区 来源: https://blog.csdn.net/weixin_46609492/article/details/119031956