MapReduce on YARN
作者:互联网
前言
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
正文
简单版本
通过提交 jar 包,进行 MapReduce 处理,那么整个运行过程分为五个环节:
- 向 client 端提交 MapReduce job.
- 随后 yarn 的 ResourceManager 进行资源的分配.
- 由 NodeManager 进行加载与监控 containers.
- 通过 applicationMaster 与 ResourceManager 进行资源的申请及状态的交互,由 NodeManagers 进行 MapReduce 运行时 job 的管理.
- 通过 hdfs 进行 job 配置文件、jar 包的各节点分发。
详细版本
以下是在 YARN 集群中 MapReduce 应用程序启动时发生的事件序列。
- 客户端向 ResourceManager 提交 MapReduce v2 ( MRv2 )应用程序请求,如下所示:
hadoop jar wordcount.jar WordCount testdata output
- ResourceManager 的 ApplicationManager 组件指示 NodeManager (运行在每一个工作节点中的其中一个)为应用程序启动一个新的 ApplicationMaster 实例。
这是该应用程序的 0 号容器。 运行 mapper 和 reducer 的容器会在之后被创建,并被命名为 01 、 02 、 03 等。 - ApplicationMaster 通过向 ResourceManager 注册来初始化自身。
- ApplicationMaster 计算完成应用程序所需的资源。
ApplicationMaster 基于对输入数据的切分确定应该启动的 Map 任务的数量。
它通过请求应用程序所需的输入文件名称和数据块的位置来计算输入切分的数量。
利用这些信息, ApplicationMaster 计算处理输入数据所需的 Map 任务数量 - ApplicationMaster 请求 ResourceManager 为 Map 任务分配必要的容器。
其在应用程序的整个生命周期内与 ResourceManager 保持联系,确保其所需资源的列表被 ResourceManager 遵守,并且发送一些必要的 kill 请求杀死任务。 - ResourceManager 的 Scheduler 组件决定 Map 任务在哪个节点运行。
做出这个决定的关键因素包括数据位置和支持创建执行任务的新容器的节点的可用内存。
ResourceManager 将 ApplicationMaster 的资源请求放入队列中,并在节点中有可用资源时在特定节点为容器授予租赁权。 - ApplicationMaster 指示 NodeManager 在已分配容器的节点上创建容器。
- NodeManager 创建请求的容器,并启动它们。
容器发送 MapReduce 的运行状态给 ApplicationMaster (每个作业只有一个 ApplicationMaster ) - ApplicationMaster 为 Reduce 任务向 ResourceManager 申请资源(如果 MapReduce 应用程序包含 Reduce 的话,有些不需要)。
- ApplicationMaster 请求 NodeManager 在 ResourceManager 为 Reduce 任务分配资源的节点上启动 Reduce 任务
- Reduce 任务对 mapper 的中间数据执行 Shuffle 和排序操作,并将输出写入 HDFS ( out 目录)中。
- NodeManager 将状态和健康状况报告发送到 ResourceManager 。
一旦所有任务完成, ApplicationMaster 会将结果发送给客户端应用程序,并将作业信息和日志发送到 JobHistoryServer 。
任务容器清理其状态,并将中间输出从本地文件系统中删除 - 一旦应用程序完成运行, ApplicationMaster 会通知 ResourceManager 该作业已成功完成,并将自身从 ResourceManager 中注销并关闭。
- ResourceManager 释放应用程序持有的所有资源(容器)以供集群复用。
补充
Job 初始化过程
- 当 resourceManager 收到了 submitApplication() 方法的调用通知后,scheduler 开始分配 container,
随之 ResourceManager 发送 applicationMaster 进程,告知每个 nodeManager 管理器。 - 由 applicationMaster 决定如何运行 tasks,如果 job 数据量比较小,applicationMaster 便选择将 tasks 运行在一个JVM中。
判断的依据是什么?
- 当一个 job 的 mappers 数量小于10个(mapreduce.job.ubertask.maxmaps)
- 只有一个 reducer(mapreduce.job.ubertask.maxreduces)
- 读取的文件大小要小于一个 HDFS block(mapreduce.job.ubertask.maxbytes)
- 在运行tasks之前,applicationMaster 将会调用 setupJob() 方法,随之创建 output 的输出路径
这就可以解释,不管 mapreduce 一开始是否报错,输出路径都会创建
Task 任务分配
- 接下来 applicationMaster 向 ResourceManager 请求 containers 用于执行 map 与 reduce 的 tasks,这里 map task 的优先级要高于 reduce task,
当所有的 map tasks 结束后,随之进行 sort(这里是 shuffle 过程),最后进行 reduce task 的开始。
当 map tasks 执行了百分之 5% 的时候,将会请求reduce
- 运行 tasks 的是需要消耗内存与 CPU 资源的,默认情况下,map 和 reduce 的 task 资源分配为 1024MB 与一个核
对应的可修改运行的最小与最大参数配置:
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.reduce.cpu.vcores
Task 任务执行
- 这时一个 task 已经被 ResourceManager 分配到一个 container 中,由 applicationMaster 告知 nodemanager 启动 container,
这个 task 将会被一个主函数为 YarnChild 的 java application 运行,但在运行 task 之前,首先定位 task 需要的 jar 包、配置文件以及加载在缓存中的文件。 - YarnChild 运行于一个专属的 JVM 中,所以任何一个 map 或 reduce 任务出现问题,都不会影响整个 nodemanager 的 正常运行。
3、每个 task 都可以在相同的 JVM task 中完成,随之将完成的处理数据写入临时文件中。
运行进度与状态更新
MapReduce 是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么 Job 的运行状态监控就非常重要。
每个 job 以及每个 task 都有一个包含 job(running,successfully completed,failed)的状态,以及 value 的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息)
这些信息是如何与客户端进行通信的呢?
当一个 task 开始执行,它将会保持运行记录,记录 task 完成的比例,对于 map 的任务,将会记录其运行的百分比,对于 reduce 来说可能复杂点,但系统依旧会估计 reduce 的完成比例。当一个 map 或 reduce 任务执行时,子进程会持续每三秒钟与 applicationMaster 进行交互。
标签:task,ResourceManager,reduce,MapReduce,YARN,ApplicationMaster,job,运行 来源: https://blog.csdn.net/Shockang/article/details/118532603