其他分享
首页 > 其他分享> > Hadoop系列——详解MapReduce

Hadoop系列——详解MapReduce

作者:互联网

本文主要介绍MapReduce的基本概念以及详细介绍该框架的流程

文章目录

Mapreduce 简介

Mapreduce 是什么

Mapreduce 是面向大数据并行处理的计算模型、框架和平台,其处理过程分为 Map 阶段和 Reduce 阶段。

Mapreduce 的由来

Mapreduce 最早是由 Google 公司设计一种为了解决搜索引擎中大规模网页数据的并行化处理。正是这思想,Doug Cutting 模仿 Google 的 Mapreduce,基于 Java 设计了 Hadoop 开源的 Mapreduce 并行计算框架。

Mapreduce 设计目标

Mapreduce 是一种可用于数据处理的编程框架,其采用 分而治之 的思想,把大规模数据集分发给一个主节点管理的各个分节点完成,然后通过整合各个节点的中间结果,得到最终结果。

Mapreduce 适用场景

Mapreduce 适用于数据集是可拆分并行处理的,且不影响最后结果。

Mapreduce 特点

  1. 易于编程
  2. 良好的扩展性
  3. 高容错性
  4. 适合 PB 级上数据的离线处理
  1. 不擅长实时计算
  2. 不擅长流式计算
  3. 不擅长 DAG(有向无环图) 计算

Mapreduce 的基本概念

  1. JobClient
    用来提交作业,配置参数 Configuration,打包成 jar 包存储在 HDFS 上,将文件路径提交给 JobTracker 的 Master 服务,然后由 Master 创建每个 task 将他们分发到各个 TaskTracker 服务中去执行。

  2. JobTracker
    用来协调作业的运行,由其负责资源监控和作业调度。

  3. TaskTracker
    用来处理作业划分后的任务,其主动与 JobTrack 进行通信,负责执行每个任务。

Mapreduce执行框架


Task 分为 MapTask 和 ReduceTask,均由 TaskTracker 启动。Mapreduce 处理的最小单位为 split,split 可由用户自由设置,计算公式如下:

S p l i t S i z e = M a t h . m a x ( m i n S i z e , M a t h . m i n ( m a x S i z e , b l o c k S i z e ) ) \begin{aligned} SplitSize= Math.max(minSize,Math.min(maxSize,blockSize)) \end{aligned} SplitSize=Math.max(minSize,Math.min(maxSize,blockSize))​

mapreduce.input.fileinputformat.split.minsize 默认为 1
mapreduce.input.fileinputformat.split.maxsize 默认为 Long.MAXValue
blockSize 默认为 128M
maxsize :该参数如果比 blockSize 小,则导致切片变小,即会等于配置的整个参数。
sminsize :该参数如果修改的比 blockSize 大,则切片大小会比 blockSize 大。

split 和 block 的关系

InputFormat

Mapreduce 开启任务时,需要对文件的读取,Hadoop 定义了不同的类读取不同的数据。

接口实现类


TextInputFormat

KeyValueTExtInputFormat

NLineInputFormat

自定义InputFormat

CombineTextInputFormat

OutputFormat

对 reduce 拉取的结果需要指定的输出方式写到文件系统里,可根据需求选择实现类。

接口实现类


TextOutputFormat

SequenceFileOutputFormat

自定义OutputFormat

序列化

  因为 Hadoop 在集群之间进行通讯或者 RPC 调用时是需要序列化的,而且要求序列化要快、且体积要小、占用带宽要小。而 Java 自带的序列化是重量级框架,对象序列化后会附带额外信息,比如各种校验信息、header、继承体系等。所以 Hadoop 自研了序列化框架。

Hadoop 序列化相关接口:Writable 实现的序列化机制、Comparable 管理 Key 的排序。

常见的 Hadoop 序列化类型:

Java 类型Hadoop Writable类型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
StringText
mapMapWritable
arrayArrayWritable
nullNullWritable

Mapreduce 流程

整体流程

Mapreduce 整体流程

MapTask 工作机制

  1. Read阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value 对。
  2. Map阶段:将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的key/value。
  3. Collect收集阶段:它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:先按照分区进行排序,然后区内按照字典对 key 进行快排,并在必要时对数据进行合并、压缩等操作。
  5. Combine阶段:选择性可进行 MapTask 内的优化提速。

ReduceTask 工作机制

  1. Copy阶段:从所有的 MapTask 中收集结果然后决定将数据放入缓存还是磁盘。
  2. Merge阶段:copy 数据时后天会对磁盘还有内存数据进行 Merge。
  3. Sort阶段:ReduceTask 需对所有数据进行一次归并排序,方便执行 reduce 函数。
  4. Reduce阶段:调用用户 reduce() 函数将计算结果写到 HDFS 上。

Shuffle

MapReduce 的核心就是 Shuffle 过程,Shuffle 过程是贯穿于 map 和 reduce 两个过程的。在 Map 端包括 spill 过程,在 Reduce 端包括 copy 和 sort 过程。 具体 Shuffle 过程如下。

Shuffle 机制
  1. MapTask 收集我们的 map() 方法输出的键值对,放到内存缓冲区中。
  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件,溢出前会按照分区针对 key 进行区内快排。
  3. 多个溢出文件会被合并成大的溢出文件。
  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序。
  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据。
  6. ReduceTask 对收集后的数据进行合并和归并排序
  7. 进入 ReduceTask 的逻辑运算过程,调用用户自定义的 reduce() 方法。
  8. Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。

环形缓冲区

Map 的输出结果由 Collector 处理,每个 Map 任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

环形数据结构其实就是个字节数组 byte[],叫 kvbuffer,默认为 100M。里面主要存储数据元数据。中间有个分界点,并且分界点是变化的。当环形缓冲区写入的 buffer 的大小达到 80% 满足溢写条件的时候,开始溢写 spill。系统有两个线程一个负责写入数据,一个负责 spill 数据。

分区

MapReduce 默认的分区方式是 hashPartition,在这种分区方式下,KV 对根据 key 的 hashcode 值与 reduceTask 个数进行取模,决定该键值对该要访问哪个 ReduceTask。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
		//numReduceTasks默认为1
    }
}

需要自定义分区的话可以重写 getPartition() 方法,设置 reduceTask 的数量。

排序

MapReduce 框架最重要的操作就是排序,MapTask 跟 ReduceTask 都会根据 key 进行按照字典顺序进行快排。

规约

分组

即相同的 key 的 value 会放到一个集合里。

压缩

我们可以把数据文件压缩后再存入 HDFS,以节省存储空间。但是,在使用 MapReduce 处理压缩文件时,必须考虑压缩文件的可分割性。目前,Hadoop 支持以下几种压缩格式。

压缩的基本原则:

压缩格式自带算法扩展名是否可切分压缩后,代码修改
DEFLATEDEFLATE.deflate不需要修改
gzipDEFLATE.gz不需要修改
bzip2bzip2.bz2不需要修改
SnappySnappy.snappy不需要修改
LZOLZO.lzo需要建索引,还需要指定输入格式

标签:MapReduce,ReduceTask,Hadoop,MapTask,Mapreduce,详解,key,序列化,数据
来源: https://blog.csdn.net/lhrfighting/article/details/118069030