其他分享
首页 > 其他分享> > Day61

Day61

作者:互联网

分布式计算框架Map/Reduce

分布式计算框架MapReduce

1、产生背景

Google公司为了解决其搜索引擎中大规模网页数据的并行化处理,研究提出的一种面向大规模数据处理的并行计算模型和方法,称为MapReduce。

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统GFS和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。

2004年,Cutting和同为程序员出身的Mike Cafarella决定开发一款可以代替当时的主流搜索产品的开源搜索引擎,这个项目被命名为Nutch。2005年初,Nutch的开发人员在Nutch上实现了一个MapReduce系统,到年中,Nutch的所有主要算法均完成移植,用MapReduce和NDFS来运行。在2006年2月,开发人员将NDFS和MapReduce移出Nutch形成Lucene的一个子项目,称为Hadoop。Hadoop中MapReduce的实现正是基于Google的论文的MapReduce的开源实现。

2、MapReduce是什么

MapReduce是一种编程模型,是面向大数据并行处理的计算模型、框架和平台。

3、基本特点

4、企业应用

MapReduce运行流程

1、MapReduce的主要功能

1.1数据划分和计算任务调度

系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点执行的同步控制。

1.2数据/代码互相定位

为了减少数据通信,一个基本的原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但尽可能从数据所在的本地机架上寻找可用节点以减少通信延迟。

1.3系统优化

为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个Map节点,为了避免Reduce计算阶段发生数据处理不平衡,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个Reduce节点;此外,系统还进行一些性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

1.4出错检测和恢复

以低端的商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时系统还将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错的数据。

2、MapReduce的运行流程

2.1运行流程

由上图可以看到MapReduce执行下来主要包含这样几个步骤:

1) 首先正式提交作业代码,并对输入数据源进行切片

2) master调度worker执行map任务

3) worker当中的map任务读取输入源切片

4) worker执行map任务,将任务输出保存在本地

5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件

6) 执行reduce任务,将任务输出保存到HDFS

2.2 运行流程详解

给定任意的HDFS的输入目录,其内部数据为“f a c d e……”等用空格字符分隔的字符串,通过使用MapReduce计算框架来统计以空格分隔的每个单词出现的频率,输出结果如,,形式的结果到HDFS目录中。

MapReduce将作业的整个运行过程分为两个阶段:Map阶段Reduce阶段。

Map阶段由一定数量的Map Task组成,流程如下:

Reduce阶段由一定数量的Reduce Task组成,流程如下:

通常我们把从Mapper阶段输出数据到Reduce阶段的reduce计算之间的过程称之为shuffle。

MapReduce Java API应用

1、MapReduce开发流程

具体提交命令为:

yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3

2、WordCount代码实现

2.1 Map类编写

package com.tianliangedu.mapper;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MyTokenizerMapper extends

Mapper {

// 暂存每个传过来的词频计数,均为1,省掉重复申请空间

private final static IntWritable one = new IntWritable(1);

// 暂存每个传过来的词的值,省掉重复申请空间

private Text word = new Text();

// 核心map方法的具体实现,逐个对去处理

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

// 用每行的字符串值初始化StringTokenizer

StringTokenizer itr = new StringTokenizer(value.toString());

// 循环取得每个空白符分隔出来的每个元素

while (itr.hasMoreTokens()) {

// 将取得出的每个元素放到word Text对象中

word.set(itr.nextToken());

// 通过context对象,将map的输出逐个输出

context.write(word, one);

}

}

}

2.2 Reduce类编写

package com.tianliangedu.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

//reduce类,实现reduce函数

public class IntSumReducer extends

Reducer {

private IntWritable result = new IntWritable();

//核心reduce方法的具体实现,逐个去处理

public void reduce(Text key, Iterable values,

Context context) throws IOException, InterruptedException {

//暂存每个key组中计算总和

int sum = 0;

//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值

for (IntWritable val : values) {

//将key组中的每个词频数值sum到一起

sum += val.get();

}

//将该key组sum完成的值放到result IntWritable中,使可以序列化输出

result.set(sum);

//将计算结果逐条输出

context.write(key, result);

}

}

2.3 Driver类编写

package com.tianliangedu.driver;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.tianliangedu.mapper.MyTokenizerMapper;

import com.tianliangedu.reducer.IntSumReducer;

public class WordCountDriver {

// 启动mr的driver方法

public static void main(String[] args) throws Exception {

// 得到集群配置参数

Configuration conf = new Configuration();

// 设置到本次的job实例中

Job job = Job.getInstance(conf, "天亮WordCount");

// 指定本次执行的主类是WordCount

job.setJarByClass(WordCountDriver.class);

// 指定map类

job.setMapperClass(MyTokenizerMapper.class);

// 指定combiner类,要么不指定,如果指定,一般与reducer类相同

job.setCombinerClass(IntSumReducer.class);

// 指定reducer类

job.setReducerClass(IntSumReducer.class);

// 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 指定输入数据的路径

FileInputFormat.addInputPath(job, new Path(args[0]));

// 指定输出路径,并要求该输出路径一定是不存在的

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

2.4本地模拟分布式计算环境运行mapreduce

鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境

具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。

2.5 Maven打包

使用Maven命令,基于配置的Maven插件实现代码打包。

2.6 上传到运行环境

使用rz命令将打好的运行包上传到集群环境中。

2.7 运行WordCount程序

具体提交命令为:

yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3

2.8 查看执行过程

Web访问地址为:http://cluster1.hadoop:8088/ui2/#/yarn-apps/apps

2.9 查看执行结果

3、标准代码实现

将之前的三个类,合并成一个类来处理

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//启动mr的driver类

public class WordCountDriver {

//map类,实现map函数

public static class MyTokenizerMapper extends

Mapper {

//暂存每个传过来的词频计数,均为1,省掉重复申请空间

private final static IntWritable one = new IntWritable(1);

//暂存每个传过来的词的值,省掉重复申请空间

private Text word = new Text();

//核心map方法的具体实现,逐个对去处理

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

//用每行的字符串值初始化StringTokenizer

StringTokenizer itr = new StringTokenizer(value.toString());

//循环取得每个空白符分隔出来的每个元素

while (itr.hasMoreTokens()) {

//将取得出的每个元素放到word Text对象中

word.set(itr.nextToken());

//通过context对象,将map的输出逐个输出

context.write(word, one);

}

}

}

//reduce类,实现reduce函数

public static class IntSumReducer extends

Reducer {

private IntWritable result = new IntWritable();

//核心reduce方法的具体实现,逐个去处理

public void reduce(Text key, Iterable values,

Context context) throws IOException, InterruptedException {

//暂存每个key组中计算总和

int sum = 0;

//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值

for (IntWritable val : values) {

//将key组中的每个词频数值sum到一起

sum += val.get();

}

//将该key组sum完成的值放到result IntWritable中,使可以序列化输出

result.set(sum);

//将计算结果逐条输出

context.write(key, result);

}

}

//启动mr的driver方法

public static void main(String[] args) throws Exception {

//得到集群配置参数

Configuration conf = new Configuration();

//设置到本次的job实例中

Job job = Job.getInstance(conf, "天亮WordCount");

//通过指定相关字节码对象,找到所属的主jar包

job.setJarByClass(WordCountDriver.class);

//指定map类

job.setMapperClass(MyTokenizerMapper.class);

//指定combiner类,要么不指定,如果指定,一般与reducer类相同

job.setCombinerClass(IntSumReducer.class);

//指定reducer类

job.setReducerClass(IntSumReducer.class);

//指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//指定输入数据的路径

FileInputFormat.addInputPath(job, new Path(args[0]));

//指定输出路径,并要求该输出路径一定是不存在的

FileOutputFormat.setOutputPath(job, new Path(args[1]));

//指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

4、mapreduce作业运行的依赖项

MapReduce Shell应用

1、MapReduce的二级命令

mapred称为一级命令,直接输入mapred回车,即可查看二级命令:

2、MapReduce的三级命令

输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令:

3、MapReduce shell应用

先提交一个WordCount任务,然后使用mapred job -list查看任务列表

由于某种原因,要立即终止某任务的执行,则使用mapred job -kill job-id。

构造场景:先提交一个WordCount job,然后通过kill job-id来终止任务:

使用mapred shell命令,通过job-id可以查看job的工作日志。

命令格式为:mapred job -logs job-id:

MapReduce技术特征

1、向“外”横向扩展,而非向“上”纵向扩展

2、失效被认为是常态

3、移动计算,把处理向数据迁移(数据本地性)

4、顺序处理数据、避免随机访问数据

5、推测执行

6、平滑无缝的可扩展性

7、为应用开发隐藏系统底层细节

MapReduce项目练习

1、在hdfs目录/tmp/tianliangedu/input/wordcount中有一系列文件,内容均为","号分隔,

求按","号分隔的各个元素的出现频率,输出到目录/tmp/tianliangedu/output/个人用户名的hdfs目录中。(必做)

2、在hdfs目录/tmp/tianliangedu/input/wordcount目录中有一系列文件,内容为","号分隔,分隔后的元素均为数值类型、字母、中文,求所有出现的数值的和。(必做)

3、在hdfs目录/tmp/tianliangedu/input/wordcount目录中有一系列文件,内容为","号分隔,分隔后的元素均为数值类型、字母、中文,求所有出现的数值的平均值。(必做)

4、在hdfs目录/tmp/tianliangedu/input/wordcount目录中有一系列文件,内容为","号分隔,分隔后的元素均为数值类型、字母、中文,求数值类型、字母类型、中文类型各自的次数。(必做)

5、在hdfs目录/tmp/tianliangedu/input/wordcount目录中有一系列文件,内容为","号分隔,同时在hdfs路径/tmp/tianliangedu/black.txt黑名单文件,一行一个单词用于存放不记入统计的单词列表。求按","号分隔的各个元素去除掉黑名单后的出现频率,输出到目录/tmp/tianliangedu/output/个人用户名的hdfs目录中。 (必做)

6、在hdfs目录/tmp/tianliangedu/input/wordcount目录中有一系列文件,求这些文件一共有多少行?(类似于mysql数据库中的select count(*) from table)。(必做)

7、在hdfs目录/tmp/table/student中存在student.txt文件,hdfs目录/tmp/table/student_location中存在student_location.txt文件,求两个hdfs目录中共有多少行?

8、在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),hdfs目录/tmp/table/student_location中存在student_location.txt文件,按tab分隔,字段名为(学号,省份,城市,区名),在Map任务中用student_location.txt文件中的学号过滤student.txt中的学号字段,输出student.txt中的存在交集的记录,输出结果结构按tab分隔后的四个字段为(学号,姓名,课程号,班级名称)。

9、(列筛选不去重) 在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),选择学号和班级名称列不去重输出,输出结果结构按tab分隔后的两个字段为(学号,班级名称)。

10、 (列筛选去重)在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),选择学号和班级名称列去掉重复存在行输出,输出结果结构按tab分隔后的两个字段为(学号,班级名称)。

11、 (列值replace操作)在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),将班级名称为"计算机*班"的更换成"计算机科学与技术*班",不做去重,输出结果结构按tab分隔后的两个字段为(学号,班级名称)。

12、 (列分组统计操作)在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),将课程号为分组,输出结果结构按tab分隔后的两个字段为(课程号,分组人数)。

13、 (列分组统计操作)在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),将班级名称为分组,输出结果结构按tab分隔后的两个字段为(班级名称,分组人数)。

14、 (多列分组统计操作)在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),将课程号和班级名称为各自分组,输出结果结构按tab分隔后的两个字段为(分组名称,分组人数),即如(课程号,分组人数)、(班级名称,分组人数)。

15、 (两表inner join) 在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),hdfs目录/tmp/table/student_location中存在student_location.txt文件,按tab分隔,字段名为(学号,省份,城市,区名),对两个hdfs目录的按学号求交集,输出结果结构按tab分隔后的四个字段为(学号,姓名,课程号,班级名称)。

16、 (两表left join) 在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),hdfs目录/tmp/table/student_location中存在student_location.txt文件,按tab分隔,字段名为(学号,省份,城市,区名),对两个hdfs目录的按学号以student为主表,求左外链接操作,输出结果结构按tab分隔后的七个字段为(学号,姓名,课程号,班级名称,省份,城市,区名)。

17、 (两表right join) 在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),hdfs目录/tmp/table/student_location中存在student_location.txt文件,按tab分隔,字段名为(学号,省份,城市,区名),对两个hdfs目录的按学号以student为主表,求右外链接操作,输出结果结构按tab分隔后的七个字段为(学号,姓名,课程号,班级名称,省份,城市,区名)。

18、 (两表full join) 在hdfs目录/tmp/table/student中存在student.txt文件,按tab分隔,字段名为(学号,姓名,课程号,班级名称),hdfs目录/tmp/table/student_location中存在student_location.txt文件,按tab分隔,字段名为(学号,省份,城市,区名),对两个hdfs目录的按学号以student为主表,求全外链接操作,输出结果结构按tab分隔后的七个字段为(学号,姓名,课程号,班级名称,省份,城市,区名)。

标签:tmp,hdfs,分隔,MapReduce,Day61,job,student
来源: https://blog.csdn.net/hhdxxy/article/details/122851335