Hadoop系列——详解MapReduce
作者:互联网
本文主要介绍MapReduce的基本概念以及详细介绍该框架的流程
文章目录
Mapreduce 简介
Mapreduce 是什么
Mapreduce 是面向大数据并行处理的计算模型、框架和平台,其处理过程分为 Map 阶段和 Reduce 阶段。
Mapreduce 的由来
Mapreduce 最早是由 Google 公司设计一种为了解决搜索引擎中大规模网页数据的并行化处理。正是这思想,Doug Cutting 模仿 Google 的 Mapreduce,基于 Java 设计了 Hadoop 开源的 Mapreduce 并行计算框架。
Mapreduce 设计目标
Mapreduce 是一种可用于数据处理的编程框架,其采用 分而治之 的思想,把大规模数据集分发给一个主节点管理的各个分节点完成,然后通过整合各个节点的中间结果,得到最终结果。
Mapreduce 适用场景
Mapreduce 适用于数据集是可拆分并行处理的,且不影响最后结果。
Mapreduce 特点
- 优点:
- 易于编程
- 良好的扩展性
- 高容错性
- 适合 PB 级上数据的离线处理
- 缺点:
- 不擅长实时计算
- 不擅长流式计算
- 不擅长 DAG(有向无环图) 计算
Mapreduce 的基本概念
-
JobClient
用来提交作业,配置参数 Configuration,打包成 jar 包存储在 HDFS 上,将文件路径提交给 JobTracker 的 Master 服务,然后由 Master 创建每个 task 将他们分发到各个 TaskTracker 服务中去执行。 -
JobTracker
用来协调作业的运行,由其负责资源监控和作业调度。 -
TaskTracker
用来处理作业划分后的任务,其主动与 JobTrack 进行通信,负责执行每个任务。
Task 分为 MapTask 和 ReduceTask,均由 TaskTracker 启动。Mapreduce 处理的最小单位为 split,split 可由用户自由设置,计算公式如下:
S p l i t S i z e = M a t h . m a x ( m i n S i z e , M a t h . m i n ( m a x S i z e , b l o c k S i z e ) ) \begin{aligned} SplitSize= Math.max(minSize,Math.min(maxSize,blockSize)) \end{aligned} SplitSize=Math.max(minSize,Math.min(maxSize,blockSize))
mapreduce.input.fileinputformat.split.minsize 默认为 1
mapreduce.input.fileinputformat.split.maxsize 默认为 Long.MAXValue
blockSize 默认为 128M
maxsize :该参数如果比 blockSize 小,则导致切片变小,即会等于配置的整个参数。
sminsize :该参数如果修改的比 blockSize 大,则切片大小会比 blockSize 大。
InputFormat
Mapreduce 开启任务时,需要对文件的读取,Hadoop 定义了不同的类读取不同的数据。
TextInputFormat
- 默认使用类,按行读取每条数据,Key是该行数据的 offset,Value = 行内容。
KeyValueTExtInputFormat
- 每行都是一条记录,被指定分隔符分割为Key跟Value,默认是 \t 。
NLineInputFormat
- 该模型下每个 map 处理 InputSplit 时不再按照 Block 块去划分,而是按照指定的行数N来划分文件。
自定义InputFormat
- 基础接口,改写 RecordReader,实现一次读取一个完整文件封装为 KV,使用 SequenceFileOutPutFormat 输出合并文件。
CombineTextInputFormat
- 用于小文件过多场景,逻辑上合并多个小文件个一个切片任务。
OutputFormat
对 reduce 拉取的结果需要指定的输出方式写到文件系统里,可根据需求选择实现类。
TextOutputFormat
- 系统默认输出格式,把每条记录写为文本行,他的 K 和 V 是任意类型,系统在写入时候会统一转化为字符串。
SequenceFileOutputFormat
- 此模式下的输出结果作为后续MapReduce任务的输入,该模式下数据格式紧凑,很容易被压缩。
自定义OutputFormat
- 自定义类继承 FileOutputFormat。
- 重写 RecordWriter,改写具体输出数据的方法 write()。
序列化
- 序列化:将内存中对象转换为二进制的字节序列,可以通过 输出流持久化存储 或者 网络传输。
- 反序列化:将收到字节序列或者是硬盘的持久化数据,转换成内存中的对象。
因为 Hadoop 在集群之间进行通讯或者 RPC 调用时是需要序列化的,而且要求序列化要快、且体积要小、占用带宽要小。而 Java 自带的序列化是重量级框架,对象序列化后会附带额外信息,比如各种校验信息、header、继承体系等。所以 Hadoop 自研了序列化框架。
Hadoop 序列化相关接口:Writable 实现的序列化机制、Comparable 管理 Key 的排序。
常见的 Hadoop 序列化类型:
Java 类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
null | NullWritable |
Mapreduce 流程
整体流程
MapTask 工作机制
- Read阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value 对。
- Map阶段:将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的key/value。
- Collect收集阶段:它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
- Spill阶段:先按照分区进行排序,然后区内按照字典对 key 进行快排,并在必要时对数据进行合并、压缩等操作。
- Combine阶段:选择性可进行 MapTask 内的优化提速。
ReduceTask 工作机制
- Copy阶段:从所有的 MapTask 中收集结果然后决定将数据放入缓存还是磁盘。
- Merge阶段:copy 数据时后天会对磁盘还有内存数据进行 Merge。
- Sort阶段:ReduceTask 需对所有数据进行一次归并排序,方便执行 reduce 函数。
- Reduce阶段:调用用户 reduce() 函数将计算结果写到 HDFS 上。
Shuffle
MapReduce 的核心就是 Shuffle 过程,Shuffle 过程是贯穿于 map 和 reduce 两个过程的。在 Map 端包括 spill 过程,在 Reduce 端包括 copy 和 sort 过程。 具体 Shuffle 过程如下。
- MapTask 收集我们的 map() 方法输出的键值对,放到内存缓冲区中。
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件,溢出前会按照分区针对 key 进行区内快排。
- 多个溢出文件会被合并成大的溢出文件。
- 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序。
- ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据。
- ReduceTask 对收集后的数据进行合并和归并排序。
- 进入 ReduceTask 的逻辑运算过程,调用用户自定义的 reduce() 方法。
- Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
环形缓冲区
Map 的输出结果由 Collector 处理,每个 Map 任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
环形数据结构其实就是个字节数组 byte[],叫 kvbuffer,默认为 100M。里面主要存储数据 和 元数据。中间有个分界点,并且分界点是变化的。当环形缓冲区写入的 buffer 的大小达到 80% 满足溢写条件的时候,开始溢写 spill。系统有两个线程一个负责写入数据,一个负责 spill 数据。
分区
MapReduce 默认的分区方式是 hashPartition,在这种分区方式下,KV 对根据 key 的 hashcode 值与 reduceTask 个数进行取模,决定该键值对该要访问哪个 ReduceTask。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
//numReduceTasks默认为1
}
}
需要自定义分区的话可以重写 getPartition() 方法,设置 reduceTask 的数量。
排序
MapReduce 框架最重要的操作就是排序,MapTask 跟 ReduceTask 都会根据 key 进行按照字典顺序进行快排。
- MapTask 将缓冲区数据快排后写入到磁盘,然后磁盘文件会进行归并排序。
- ReduceTask 统一对内存跟磁盘所有数据进行归并排序。
规约
- Combiner 是 MR 程序中 Mapper 跟 Reducer 之外的组件。
- Combiner 是在每一个 MapTask 所在节点运行,Reducer 是接受全部 Mapper 输出结果。
- Combiner 属于局部汇总的意思,来减少网络传输。
- Combiner 用的时候要注意不能影响最终结果就行。比如求平均值就不行,中间会改变结果逻辑。
分组
即相同的 key 的 value 会放到一个集合里。
压缩
我们可以把数据文件压缩后再存入 HDFS,以节省存储空间。但是,在使用 MapReduce 处理压缩文件时,必须考虑压缩文件的可分割性。目前,Hadoop 支持以下几种压缩格式。
压缩的基本原则:
- 运算密集型任务 ,少压缩。
- IO密集型任务,多压缩。
压缩格式 | 自带 | 算法 | 扩展名 | 是否可切分 | 压缩后,代码修改 |
---|---|---|---|---|---|
DEFLATE | 是 | DEFLATE | .deflate | 否 | 不需要修改 |
gzip | 是 | DEFLATE | .gz | 否 | 不需要修改 |
bzip2 | 是 | bzip2 | .bz2 | 是 | 不需要修改 |
Snappy | 否 | Snappy | .snappy | 否 | 不需要修改 |
LZO | 否 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
标签:MapReduce,ReduceTask,Hadoop,MapTask,Mapreduce,详解,key,序列化,数据 来源: https://blog.csdn.net/lhrfighting/article/details/118069030