其他分享
首页 > 其他分享> > 大数据学习教程SD版—第四篇【Hadoop MapReduce】

大数据学习教程SD版—第四篇【Hadoop MapReduce】

作者:互联网

文章目录

4. Hadoop MapReduce

分布式计算引擎框架,离线计算,不擅长DAG计算

4.1 MapReduce 优点

  1. 易于编程,实现框架接口即可
  2. 良好的扩展性,动态加节点
  3. 高容错性,任务可以转移
  4. 适合海量数据计算

4.2 MapReduce 核心思想

一个MapTask 默认处理128M数据

以WordCount 的MapReduce程序为例

  1. 读数据,按行处理
  2. 按空格(或其他切割符)切分单词
  3. 形成KV键值对(word,1)
  4. 将所有KV键值对,按照指定的分区,溢写到磁盘
  1. 根据MapTask的分区数,开启对应数量的ReduceTask
  2. 一个ReduceTask只处理对应分区号的多个MapTask产生的结果
  3. 最终完成单词统计,并输出到结果文件

4.3 MapReduce 序列化类型

除了String 在Hadoop 类型中是Text外,其余都是在原Java类型后加上Writable后缀

4.4 MapReduce 编程规范

  1. 继承Mapper父类,重写map()方法
  2. Mapper输入输出都是KV
  3. map()方法定义处理逻辑,对每个KV调用一次
  1. 继承Reduce父类,重写reduce()方法
  2. Reduce输入输出也是KV
  3. reduce()方法定义处理逻辑,对每组KV(按K分组)调用一次

启动MapReduce程序的客户端,提交任务到YARN集群

4.5 MapReduce WordCount

执行流程: Mapper : setup() -> run() -> for map() -> cleanup() ->Reducer: setup() ->run -> for reduce() ->cleanup()

package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            outK.set(word);
            context.write(outK,outV);
        }
    }
}
package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            int times = value.get();
            count += times;
        }
        outV.set(count);
        context.write(key, outV);
    }
}
package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.jar class
        job.setJarByClass(WordCountDriver.class);
        // 3.mapper class reducer class
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.map kv
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.out kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.path: input output
        FileInputFormat.setInputPaths(job, new Path("E:\\HadoopCode\\InputText"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopCode\\OutputText2"));
        // 7.submit
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

打成jar包,在集群上运行

 hadoop jar wordcount.jar com.ipinyou.mapreduce.wordcount.WordCountDriver /input /output

4.6 MapReduce 序列化

轻量、紧凑、快速、互操作性

  1. 实现Writable接口
  2. 反序列化需要空参构造函数
  3. 实现序列化write()和反序列化readFields(),字段顺序必须一致

如果自定义的bean要作为Map的输出Key,则必须实现Compare接口,shuffle需要对key进行排序

此处也可以用字符串,绝大部分数据都可以用字符串拼接得到!

4.7 MapReduce InputFormat

数据块:物理上的Block

数据切片:逻辑上对数据进行切分,默认切片大小=Block 大小,切片的对象是单个文件

  1. 找到数据输入目录
  2. 遍历目录的每一个文件
  3. 遍历每一个文件,获取文件大小,计算切片大小,默认splitSize=blockSize 弹性1.1

获取切片大小的公式:Math.max(minSize,Math.min(maxSize,blockSize))

minsize =1 maxsize =Long.MaxValue

按文件处理,按文件进行逐行处理

针对大量小文件,通过设置虚拟文件大小,来进行小文件逻辑上的合并切片

例如 设置为4M 小文件 2M 5M 4M 产生的虚拟文件 2M 2.5M 2.5M 4M 切成两片(2+2.5)、(2.5+4)

代码中设置:

        // 改变默认InputFormat 设置为4M
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

4.8 MapReduce 工作流程

  1. 准备待处理文件
  2. client submit 前,获取待处理数据的信息
  3. 提交信息(Job.split、xxx.jar、Job.xml)
  4. 计算MapTask数量,开启对应数量的MapTask
  5. 在MapperTask内部进行逻辑处理
  6. 写出数据到环形缓冲区(默认大小 100M 80%)
  7. 达到溢写比之后,根据设置的分区数进行分区,并且分区内部数据进行快速排序(meta排序)
  8. 反向溢写到磁盘文件,文件各分区进行Merge归并排序 ,此处对于如果相同key进行value的累加时,可以Combine 合并相同K,对V进行累加
  9. 默认等待所有的MapTask任务结束后,根据数据分区数,开启对应ReduceTask数量
  10. 对应的ReduceTask拉取到对应分区的数据,下载到本地的磁盘
  11. 合并文件,进行归并排序(原因:分区有序,但整体无序)
  12. 在ReduceTask内部进行逻辑处理之后,输出到磁盘文件

4.9 MapReduce Shuffle机制

Mapper任务之后,Reducer任务之前的洗牌过程

  1. mapper任务把数据写入环形缓冲区collector
  2. 达到溢写比0.8之后,反向写出数据到指定分区partitioner.getPartition(),并进行分区快排
  3. 溢写到磁盘后,各分区进行归并排序
  4. reducer任务把分区数据拉取到内存缓冲,不够溢出到磁盘
  5. 对数据进行归并排序,按相同K进行分组,执行reduce方法

4.10 MapReduce Partitioner

默认Partitioner分区是对K的hashcode值对ReduceTask个数取模得到,若没有设置ReduceTask数量,则使用Partition-1,即 1-1 =0 一个分区

  1. 定义类继承Partitioner类,实现getPartition()方法
  2. 在驱动类中设置自定义的Partitioner和对应数量的ReduceTask数量

如果ReduceTask > Partitioner 数量 ,则会产生空的分区文件

如果ReduceTask = Partitioner 数量,正好

如果ReduceTask < Partiioner && ReduceTask >1 ,则会抛出IO异常

如果ReduceTask = 1,则设置的分区方法不生效,产生一个分区文件

4.11 MapReduce Sort

Hadoop 程序默认都会按照Key进行排序,为了Reduce阶段提高效率,默认字典序,快排

  1. 实现WritableComparable接口,重写CompareTo()方法

缺点:只能对Key进行排序,需要把待排序的对象放在Mapper的Key的位置,在Reducer再把对象放回到Value的位置,正序 >1 倒序> -1

4.12 MapReduce Combiner

Reducer的一个子类,用于每一个MapTask的输出进行局部汇总,且使用有限制,不能影响最终的业务逻辑,继而言之,只能用于求和,预聚合

  1. 继承Reducer类,重写reduce方法
  2. 在驱动类中设置自定义的预聚合类

实际在驱动类中,直接设置使用Reducer类即可,不用再写一遍

4.13 MapReduce OutputFormat

默认使用TextOutputFormat

  1. 创建一个类继承FileOutputFormat,重写其方法
  2. 需要一个RecordWriter实现类,在此类方法中处理业务逻辑,默认不换行
  3. 在驱动类中指定自定义的OutputFormat

4.14 MapReduce MapTask工作机制

  1. Read阶段
  2. Map阶段
  3. Collect阶段
  4. 溢写阶段
  5. Merge阶段

MapTask 并行度:由切片数量决定

4.15 MapReduce ReduceTask工作机制

  1. Copy阶段
  2. Sort阶段
  3. Reduce阶段

ReduceTask 并行度 :

= 0 没有

默认= 1 一个

根据集群性能而定

MapTask和Reduce的工作流程 ,具体更细节的部分可以看源码

4.16 MapReduce ReduceJoin

针对两个不同类型数据的join操作,在reduce端进行join,导致reduce端处理的数据过多,容易导致数据倾斜

4.17 MapReduce Mapjoin

在map端进行join,缓存一张小表,即在mapper的setup方法中将小表数据缓存到集合,并且需要在驱动类中加载缓存,可以避免ReduceJoin的数据倾斜!

  1. Driver类加载缓存数据
  2. Mapper端在setup()读取缓存数据,加载到内存集合
  3. Mapper端在map() 处理业务逻辑,即join操作
  4. 由于不需要Reducer端,设置reduceTask数目为0

4.18 MapReduce ETL

ETL 数据清洗,只需要Mapper端,根据业务,梳理清洗规则,来清洗数据

4.19 MapReduce 开发总结

  1. InputFormat
  1. Mapper
  1. Reducer
  1. OutputFormat

4.20 MapReduce 压缩

减小磁盘IO,但对应也会带来CPU开销,对于IO型job

标签:D版,分区,ReduceTask,Hadoop,MapReduce,job,import,hadoop
来源: https://blog.csdn.net/qq_41200768/article/details/121783228