MapReduce
作者:互联网
MapReduce
一、MapReduce概述MapReduce是一个分布式运算程序的编程框架。
核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.1优缺点
优点
- 易于编程
它简单的实现一些接口,就可以完成一个分布式程序。
2.良好的扩展性
3.高容错性
若其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败。
4.适合PB级以上海量数据的离线处理
缺点
- 不擅长实时计算
- 不擅长流式计算:MapReduce的输入数据是静态的,不能动态变化。
- 不擅长DAG(有向图)计算
1.2核心思想
1)分布式的运算程序往往需要分成至少2个阶段。
2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
1.3MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类进程实例:
- MrAppMaster:负责整改程序的过程调度及状态协调。
- MapTask:负责Map阶段的整个数据处理流程。
- ReduceTask:负责Reduce阶段的整个数据处理流程。
1.4常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
1.5MapReduce编程规范
1.5.1 Mapper阶段
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入数据是KV对的形式(KV的类型可自定义)
- Mapper中的业务逻辑写在map()方法中
- Mapper的输出数据是KV对的形式
- map()方法(MapTask进程)对每一个<K,V>调用一次
1.5.2 Reducer阶段
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入数据类型对应的Mapeer的数据类型,也是KV
- Reducer的业务逻辑写在reduce()方法中
- ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
1.5.3 Driver阶段
相当于Yarn集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。
二、MapReduce框架原理2.1MapReduce工作流程
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件 4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
2.2 Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
2.3 MapReduce开发总结
在编写MapReduce程序时,需要考虑如下几个方面:
1:输入数据接口:TextInputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内人作为value返回。
(3)KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key,value。默认分隔符是tab。
(4)NlineInputFormat按照指定的行数N来划分切片。
(5)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
(6)用户还可以自定义InputFormat。
2:逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup()
3:Partitioner分区
(1)默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号,key.hashCode()&Integer。MAXVALUE%numReduces。
4:Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法
(2)部分排序:对最终输出的每一个文件进行内部排序
(3)全排序:对所有数据进行排序,通常只有一个Reduce
(4)二次排序:排序的条件有两个。
5:Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。
6:Reduce端分组:GroupingComparator
在Reduce端对key进行分组,在接收的Key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
7:逻辑处理接口:Reducer
根据业务需求实现其中三个方法:reduce() setup() cleanup()
8:输出数据接口:OutputFormat
(1)默认实现类是TextOutputformat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)将SequenceFileOutputFormat输出作为后续MapReduce任务的输入。因为它的格式紧凑,易压缩。
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
3.1 Yarn基本架构
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。
3.2 Yarn运行机制
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
3.3 作业提交过程之YARN
(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
3.4 作业提交过程之MapReduce
3.5 资源调度器
Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。
3.5.1 FIFO调度器
3.5.2 容量调度器
3.5.3 平调度器
标签:ReduceTask,MapReduce,MapTask,提交,RM,NodeManager 来源: https://blog.51cto.com/u_15078339/2862984