编程语言
首页 > 编程语言> > 103_MapReduce编程框架

103_MapReduce编程框架

作者:互联网

1 MapReduce思想

MapReduce思想在生活中处处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核心是分而治之,充分利用了并行处理的优势。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。

MapReduce任务过程是分为两个处理阶段:

image-20210708220500410

image-20210708220513772

2 官方WordCount案例源码解析

WordCount.java

package org.apache.hadoop.examples;

import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
  public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens())
      {
        this.word.set(itr.nextToken());
        context.write(this.word, one);
      }
    }
  }
  
  public static class IntSumReducer
    extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    private IntWritable result = new IntWritable();
    
    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      this.result.set(sum);
      context.write(key, this.result);
    }
  }
  
  public static void main(String[] args)
    throws Exception
  {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2)
    {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

image-20210708225445365

image-20210708230129174

经过查看分析官方WordCount案例源码,我们发现一个统计单词数量的MapReduce程序的代码由三个部分组成:

Mapper类继承了org.apache.hadoop.mapreduce.Mapper类并重写了其中的map方法,Reducer类继承了org.apache.hadoop.mapreduce.Reducer类并重写了其中的reduce方法。

重写的Map方法作用:map方法中的逻辑 是用户希望mr程序中 map阶段如何处理的逻辑。

重写的Reduce方法作用:reduce方法中的逻辑 是用户希望mr程序中 reduce阶段如何处理的逻辑。

2.1 Hadoop序列化

什么是Hadoop序列化?

Hadoop序列指:当我们通过网络通信传输数据时 或者 把对象持久化到文件时,需要把对象序列化成二进制的结构的过程。

观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束,比如自定义Mapper有四个形
参类型,但是形参类型并不是常见的java基本类型。

为什么Hadoop要选择建立自己的序列化格式而不使用java自带的serializable呢?

Java基本类型与Hadoop常用序列化类型:

Java基本类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
array ArrayWritable
map MapWritable

3 MapReduce编程规范及示例编写

3.1 Mapper类

注意:map()方法是对输入的一个key-value对调用一次!

3.2 Reducer类

3.3 Driver阶段

创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运行。

3.4 WordCount代码实现

3.4.1 需求分析

给定一个文本文件,统计文件中每一个单词出现的总次数

输入数据:wc.txt

image-20210710130803801

输出:

apache 2
clickhouse 2
hadoop 1
mapreduce 1
spark 2
xiaoming 1

3.4.2 具体步骤

按照MapReduce编程规范,分别编写Mapper,Reducer,Driver。

3.4.2.1 新建maven工程

image-20210713191446058

  1. 导入hadoop依赖

pom.xml

    <dependencies>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

    <!--maven打包插件 -->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:以上依赖第一次需要联网下载!

  1. 添加log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.4.2.2 整体思路梳理(仿照源码)

Map阶段:

  1. map()方法中把传入的数据转为String类型
  2. 根据空格切分出单词
  3. 输出<单词,1>

Reduce阶段:

  1. 汇总各个key(单词)的个数,遍历value数据进行累加
  2. 输出key的总数

Driver

  1. 获取配置文件对象,获取job对象实例
  2. 指定程序jar的本地路径
  3. 指定Mapper/Reducer类
  4. 指定Mapper输出的kv数据类型
  5. 指定最终输出的kv数据类型
  6. 指定job处理的原始数据路径
  7. 指定job输出结果路径
  8. 提交作业

3.4.2.3 编写Mapper类

WordCountMapper.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1 获取第一行
        String line = value.toString();

        //2 切割
        String[] words = line.split(" ");

        //3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

3.4.2.4 编写Reducer类

WordcountReducer.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        //2 输出
        v.set(sum);
        context.write(key, v);
    }
}

3.4.2.5 编写Driver驱动类

WordcountDriver.java

package com.lagou.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2 设置jar加载路径
        job.setJarByClass(WordCountDriver.class);

        //3 设置map和reduce类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

3.4.2.6 运行任务

  1. 本地模式

image-20210712202207275

在program arguments设置参数

image-20210712202332614

运行结束,去输出结果路径查看结果

image-20210712202547556

注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程
方式模拟的mr的运行。

  1. Yarn集群模式

找到Maven的默认配置文件路径

image-20210712212215916

在settings.xml文件中配置阿里云的镜像

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">

    <!-- 配置本地仓库地址 -->
    <localRepository>C:\Users\海潮明月\.m2\repository</localRepository>

    <!-- 配置下载Jar包的镜像仓库地址 -->
    <mirrors>

        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>uk</id>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://uk.maven.org/maven2/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>nexus</id>
            <name>internal nexus repository</name>
            <url>http://repo.maven.apache.org/maven2</url>
            <mirrorOf>central</mirrorOf>
        </mirror>
    </mirrors>

    <profiles>
        <!-- 下载源代码和Javadoc -->
        <profile>
            <id>downloadSources</id>
            <properties>
                <downloadSources>true</downloadSources>
                <downloadJavadocs>true</downloadJavadocs>
            </properties>
        </profile>
    </profiles>

    <!-- 激活在profiles的配置项 -->
    <activeProfiles>
        <activeProfile>downloadSources</activeProfile>
    </activeProfiles>

</settings>

settings.xml配置好以后,如果发现Maven的Plugins缺少包,可以重新导一下包

image-20210712214644402

image-20210712214358905

随后先清理一下

image-20210712213841339

最后执行打包操作

image-20210712213922316

打包成功

image-20210712214141143

jar包的路径在这里

image-20210712214115999

选择合适的jar包上传到服务器本地(如:linux121)

image-20210712215241568

image-20210712220805107

将原始数据文件上传导hdfs

image-20210712221140756

Yarn集群任务运行成功展示图

image-20210712222551652

image-20210712222647495

4 序列化Writable接口

基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对
象就需要实现Writable序列化接口。

4.1 实现Writable序列化步骤如下

  1. 必须实现Writable接口

  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    public CustomBean() {
      super();
    }
    
  3. 重写序列化方法

    @Override
    public void write(DataOutput out) throws IOException {
        ....
    }
    
  4. 重写反序列化方法

    @Override
    public void readFields(DataInput in) throws IOException {
        ....
    }
    
  5. 反序列化的字段顺序和序列化字段的顺序必须完全一致

  6. 为了方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符

  7. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!!

    @Override
    public int compareTo(CustomBean o) {
        // 自定义排序规则
        return this.num > o.getNum() ? -1 : 1;
    }
    

4.2 Writable接口案例需求

4.2.1 需求分析

统计每台智能音箱设备内容播放时长。

原始日志格式:

001   001577c3  kar_890809     120.196.100.99     1116             954             200
日志id  设备id  appkey(合作硬件厂商)  网络ip        自有内容时长(秒)  第三方内容时长(秒)   网络状态码

输出结果:

001577c3     11160            9540         20700
设备id    自有内容时长(秒)  第三方内容时长(秒)   总时长

4.2.2 编写MapReduce程序

  1. 创建SpeakBean对象

    SpeakBean.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * SpeakBean是一个自定义类型,
     * 目的是封装map输出kv中value的类型,
     * 需要实现Writable序列化接口
     */
    public class SpeakBean implements Writable {
        //定义属性
        private Long selfDuration;      //自由内容时长
        private Long thirdPartDuration; //第三方内容时长
        private String deviceId;        //设备id
        private Long sumDuration;       //总时长(=自由内容时长+第三方内容时长)
    
        //准备一个空参构造方法
        public SpeakBean() {
        }
    
        //准备一个有参构造方法
        public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) {
            this.selfDuration = selfDuration;
            this.thirdPartDuration = thirdPartDuration;
            this.deviceId = deviceId;
            this.sumDuration = this.selfDuration + this.thirdPartDuration; //总时长=自由内容时长+第三方内容时长
        }
    
        /**
         * 序列化方法:将目标数据封装到dataOutput中
         *
         * @param dataOutput
         * @throws IOException
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(selfDuration);      //long类型用writeLong方法封装
            dataOutput.writeLong(thirdPartDuration); //long类型用writeLong方法封装
            dataOutput.writeUTF(deviceId);           //String类型用writeUTF方法封装
            dataOutput.writeLong(sumDuration);       //long类型用writeLong方法封装
        }
    
        /**
         * 反序列化方法:通过dataInput读取序列化后的目标数据
         *
         * @param dataInput
         * @throws IOException
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            this.selfDuration = dataInput.readLong();      //long类型用readLong方法读取
            this.thirdPartDuration = dataInput.readLong(); //long类型用readLong方法读取
            this.deviceId = dataInput.readUTF();           //String类型用readUTF方法读取
            this.sumDuration = dataInput.readLong();       //long类型用readLong方法读取
        }
    
        public Long getSelfDuration() {
            return selfDuration;
        }
    
        public void setSelfDuration(Long selfDuration) {
            this.selfDuration = selfDuration;
        }
    
        public Long getThirdPartDuration() {
            return thirdPartDuration;
        }
    
        public void setThirdPartDuration(Long thirdPartDuration) {
            this.thirdPartDuration = thirdPartDuration;
        }
    
        public String getDeviceId() {
            return deviceId;
        }
    
        public void setDeviceId(String deviceId) {
            this.deviceId = deviceId;
        }
    
        public Long getSumDuration() {
            return sumDuration;
        }
    
        public void setSumDuration(Long sumDuration) {
            this.sumDuration = sumDuration;
        }
    
        /**
         * 为了方便查看,需重写toString方法:用制表符分隔每一列数据
         *
         * @return 返回SpeakBean对象的私有属性
         */
        @Override
        public String toString() {
            return selfDuration + "\t"
                    + thirdPartDuration + "\t"
                    + deviceId + "\t"
                    + sumDuration;
        }
    }
    
  2. 编写Mapper类

    SpeakMapper.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 一共四个参数:分为两对kv
     * 第一对kv是map输入参数的kv类型:k代指一行文本的偏移量,v代指一行文本内容
     * 第二对kv是map输出参数的kv类型:k代指map输出的key类型,v代指map输出的value类型
     */
    public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
        /*
        1 转换接收到的text数据为String类型
        2 按照制表符进行切分:自由内容时长 第三方内容时长 设备id  -->  将目标数据封装为自定义类型SpeakBean
        3 直接输出:k-->设备id  value-->SpeakBean对象
         */
        Text device_id = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1 转换接收到的text数据为String类型
            final String line = value.toString();
    
            //2 按照制表符进行切分,获取目标数据:自由内容时长 第三方内容时长 设备id
            final String[] fields = line.split("\t");      //切分输入的整行数据
            String selfDuration = fields[fields.length - 3];     //自由内容时长
            String thirdPartDuration = fields[fields.length - 2];//第三方内容时长
            String deviceId = fields[1];                         //设备id
            //2.1 将目标数据封装到自定义Bean对象中
            final SpeakBean speakBean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId);
    
            //3 直接输出:k-->设备id  value-->SpeakBean对象
            device_id.set(deviceId);
            context.write(device_id, speakBean);
        }
    }
    
  3. 编写Reducer

    SpeakReducer.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import javax.xml.soap.Text;
    import java.io.IOException;
    
    public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {
        @Override
        protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException {
            //定义时长累加的初始值
            Long self_duration = 0L;
            Long third_part_duration = 0L;
    
            //reduce方法的key:map输出的某一个key
            //reduce方法的value:map输出的kv对中,key相同的value组成的一个集合
            //reduce逻辑:遍历迭代器累加时长,得到各自的总时长
            for (SpeakBean speakBean : values) {
                final Long selfDuration = speakBean.getSelfDuration();
                final Long thirdPartDuration = speakBean.getThirdPartDuration();
                self_duration += selfDuration;
                third_part_duration += third_part_duration;
            }
    
            //将目标数据封装成SpeakBean对象进行输出
            final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString());
            context.write(key, bean);
        }
    }
    
  4. 编写驱动

    SpeakDriver.java

    package com.lagou.mr.speak;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SpeakDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            final Configuration conf = new Configuration();
            final Job job = Job.getInstance(conf, "SpeakDriver");
    
            //设置jar包本地路径
            job.setJarByClass(SpeakDriver.class);
    
            //使用的mapper和reducer
            job.setMapperClass(SpeakMapper.class);
            job.setReducerClass(SpeakReducer.class);
    
            //map的输出kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(SpeakBean.class);
    
            //设置reduce输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(SpeakBean.class);
    
            //读取的数据路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //提交任务
            final boolean flag = job.waitForCompletion(true);
            System.exit(flag ? 0 : 1);
        }
    }
    

mr编程技巧总结

5 MapReduce原理分析

5.1 MapTask运行机制详解

MapTask流程

image-20210713192542031

详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

  3. 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

  4. map逻辑执行完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

  1. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
  1. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为

    • 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
    • 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
  2. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

至此map整个阶段结束!

MapTask的一些配置:

image-20210714060036330

官方参考地址:https://hadoop.apache.org/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

5.2 MapTask的并行度

  1. MapTask并行度思考

    MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。

    思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  2. MapTask并行度决定机制

    • 数据块:Block是HDFS物理上把数据分成一块一块。
    • 切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

image-20210714060235560

5.2.1 切片机制源码阅读

image-20210714060456996

image-20210714060526010

切片机制默认就是128M

问:MapTask并行度是不是越多越好呢?

答:不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split。

MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分
片,一个是128M,一个是1M;

对于1M的切片的Map task来说,太浪费资源。

问:129M的文件在Hdfs存储的时候会不会切成两块?

答:在hdfs存储时会将129M的文件切分为两个block块128+1进行存储

5.3 ReduceTask 工作机制

image-20210714061127129

Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMergeronDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,是纯粹的sort阶段,sort完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤:

5.4 ReduceTask并行度

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

注意事项:

  1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
  2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;
  3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;

5.5 Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫
shuffle。

shuffle:洗牌、发牌——(核心机制:数据分区,排序,分组,combine,合并等过程)

image-20210714062203956

image-20210714063427071

5.5.1 MapReduce的分区与reduceTask的数量

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据,

如何才能保证相同key的数据去往同个reduce呢?只需要保证相同key的数据分发到同个分区即可。结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则!!

1 分区源码

翻阅源码验证以上规则,MR程序默认使用的HashPartitioner,保证了相同的key去往同个分区!!

image-20210714063839855

2 自定义分区

实际生产中需求变化多端,默认分区规则往往不能满足需求,需要结合业务逻辑来灵活控制分区规则以
及分区数量!!

如何制定自己需要的分区规则?

具体步骤:

  1. 自定义类继承Partitioner,重写getPartition()方法
  2. 在Driver驱动中,指定使用自定义Partitioner
  3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量

实例:按照不同的appkey把记录输出到不同的分区中

001   001577c3  kar_890809     120.196.100.99     1116             954             200
日志id  设备id  appkey(合作硬件厂商)  网络ip        自有内容时长(秒)  第三方内容时长(秒)   网络状态码
根据appkey把不同厂商的日志数据分别输出到不同的文件中

PartitionBean.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PartitionBean implements Writable {
    private String id;              //日志id
    private String deviceId;        //设备id
    private String appkey;          //appkey厂商id
    private String ip;              //ip地址
    private Long selfDuration;      //自有内容播放时长
    private Long thirdPartDuration; //第三方内容时长
    private String status;          //状态码

    public PartitionBean() {
    }

    public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
        this.id = id;
        this.deviceId = deviceId;
        this.appkey = appkey;
        this.ip = ip;
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.status = status;
    }

    //序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeUTF(appkey);
        dataOutput.writeUTF(ip);
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeUTF(status);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.deviceId = dataInput.readUTF();
        this.appkey = dataInput.readUTF();
        this.ip = dataInput.readUTF();
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.status = dataInput.readUTF();
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public String getAppkey() {
        return appkey;
    }

    public void setAppkey(String appkey) {
        this.appkey = appkey;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(Long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return id + '\t'
                + deviceId + '\t'
                + appkey + '\t'
                + ip + '\t'
                + selfDuration + '\t'
                + thirdPartDuration + '\t'
                + status;
    }

}

PartitionMapper.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
    final PartitionBean bean = new PartitionBean();
    final Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] fields = value.toString().split("\t");
        String appkey = fields[2];

        bean.setId(fields[0]);
        bean.setDeviceId(fields[1]);
        bean.setAppkey(fields[2]);
        bean.setIp(fields[3]);
        bean.setSelfDuration(Long.parseLong(fields[4]));
        bean.setThirdPartDuration(Long.parseLong(fields[5]));
        bean.setStatus(fields[6]);

        k.set(appkey);
        context.write(k, bean);
    }
}

CustomPartitioner.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int countOfPartitions) {
        int partition = 0;

        if (text.toString().equals("kar")) {
            partition = 0;
        } else if (text.toString().equals("pandora")) {
            partition = 1;
        } else {
            partition = 2;
        }
        return partition;
    }
}

PartitionReducer.java

package com.lagou.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
    @Override
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
        //无需进行聚合运算,直接输出即可
        for (PartitionBean bean : values) {
            context.write(key, bean);
        }
    }
}

PartitionDriver.java

package com.lagou.mr.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取配置文件
        final Configuration configuration = new Configuration();
        //2 获取job实例
        final Job job = Job.getInstance(configuration);

        //3 设置任务相关参数
        job.setJarByClass(PartitionDriver.class);
        job.setMapperClass(PartitionMapper.class);
        job.setReducerClass(PartitionReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PartitionBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PartitionBean.class);

        //4 设置使用自定义分区器
        job.setPartitionerClass(CustomPartitioner.class);

        //5 指定reducetask的数量与分区数量保持一致,分区数量是3
        job.setNumReduceTasks(3);//reducetask不设置默认是1个

        //6 指定输入和输出数据路径
        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\speak.data"));
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\partition\\out"));

        //7 提交任务
        final boolean flag = job.waitForCompletion(true);

        System.exit(flag ? 0 : 1);
    }
}

5.5.2 MapReduce中的Combiner

combiner运行机制:

image-20210715055904833

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和reducer的区别在于运行的位置
  4. Combiner是在每一个maptask所在的节点运行;
  5. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
  6. Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。
#### 举例说明:
假设一个计算平均值的MR任务
Map阶段:2个MapTask
	MapTask1输出数据:10,5,15 如果使用Combiner:(10+5+15)/3=10
	MapTask2输出数据:2,6     如果使用Combiner:(2+6)/2=4
Reduce阶段汇总:(10+4/2=7
而正确结果应该是:(10+5+15+2+6/5=7.6

## 所以Combiner不能用于计算平均值

1 改造WordCount程序

WordCountCombiner.java

package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, NullWritable, IntWritable> {
    final IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int num = 0;
        //进行局部汇总,逻辑与reduce方法保持一致
        for (IntWritable value : values) {
            final int i = value.get();
            num += 1;
        }

        total.set(num);
        context.write(NullWritable.get(), total);
    }
}

WordCountDriver.java

在驱动(Driver)设置使用Combiner:

job.setCombinerClass(WordcountCombiner.class);

验证结果:

观察程序运行日志

image-20210715064401272

如果直接使用WordCountReducer作为Combiner使用是否可以?

直接使用Reducer作为Combiner组件来使用是可以的!

5.6 MapReduce中的排序

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  1. 部分排序

    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

  2. 全排序

    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

  3. 辅助排序:( GroupingComparator分组)
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

  4. 二次排序.
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

5.6.1 WritableComparable

Bean对象如果作为Map输出的key时,需要实现WritableComparable接口并重写compareTo方法指定
排序规则

1 全排序

基于统计的播放时长案例的输出结果对总时长进行排序

实现全局排序只能设置一个ReduceTask!!

播放时长案例输出结果:

00fdaf3 33180 33420 00fdaf3 66600
00wersa4 30689 35191 00wersa4 65880
0a0fe2 43085 44254 0a0fe2 87339
0ad0s7 31702 29183 0ad0s7 60885
0sfs01 31883 29101 0sfs01 60984
a00df6s 33239 36882 a00df6s 70121
adfd00fd5 30727 31491 adfd00fd5 62218

需求分析

如何设计map()方法输出的key,value

MR框架中shuffle阶段的排序是默认行为,不管你是否需要都会进行排序。

key:把所有字段封装成为一个bean对象,并且指定bean对象作为key输出,如果作为key输出,需要实现排序接口,指定自己的排序规则;

具体步骤:

SpeakBeanSort.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class SpeakBeanSort implements WritableComparable<SpeakBeanSort> {
    //定义属性
    private Long selfDrutation;     //自有内容播放时长
    private Long thirdPartDuration; //第三方内容播放时长
    private String deviceId;        //设备id
    private Long sumDuration;       //总时长

    public SpeakBeanSort() {
    }

    public SpeakBeanSort(Long selfDrutation, Long thirdPartDuration, String deviceId, Long sumDuration) {
        this.selfDrutation = selfDrutation;
        this.thirdPartDuration = thirdPartDuration;
        this.deviceId = deviceId;
        this.sumDuration = sumDuration;
    }

    public Long getSelfDrutation() {
        return selfDrutation;
    }

    public void setSelfDrutation(Long selfDrutation) {
        this.selfDrutation = selfDrutation;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public Long getSumDuration() {
        return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
        this.sumDuration = sumDuration;
    }

    //序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(selfDrutation);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeLong(sumDuration);
    }

    //反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.selfDrutation = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.deviceId = dataInput.readUTF();
        this.sumDuration = dataInput.readLong();
    }

    /**
     * 指定排序规则,我们希望按照总时长进行排序
     *
     * @param o
     * @return 返回值有三种:0相等 1小于 -1大于
     */
    @Override
    public int compareTo(SpeakBeanSort o) {
        System.out.println("compareTo 方法执行了...");
        //指定按照bean对象的总时长字段的值进行比较
        if (this.sumDuration > o.sumDuration) {
            return -1;
        } else if (this.sumDuration < o.sumDuration) {
            return 1;
        } else {
            return 0; //如果相等,可以在这里加入第二个判断条件,进行二次排序
        }
    }

    @Override
    public boolean equals(Object o) {
        System.out.println("equals方法执行了...");
        return super.equals(o);
    }

    @Override
    public int hashCode() {
        return Objects.hash(selfDrutation, thirdPartDuration, deviceId, sumDuration);
    }

    @Override
    public String toString() {
        return selfDrutation + "\t"
                + thirdPartDuration + "\t"
                + deviceId + '\'' + "\t"
                + sumDuration;
    }
}

SortMapper.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text, SpeakBeanSort, NullWritable> {
    final SpeakBeanSort beanSort = new SpeakBeanSort();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取一行文本,转为字符串,切分
        final String[] fields = value.toString().split("\t");

        //解析出各个字段封装成SpeakBean对象
        beanSort.setDeviceId(fields[0]);
        beanSort.setSelfDrutation(Long.parseLong(fields[1]));
        beanSort.setThirdPartDuration(Long.parseLong(fields[2]));
        beanSort.setSumDuration(Long.parseLong(fields[4]));

        //SpeakBeanSort作为key输出
        context.write(beanSort, NullWritable.get());
    }
}

SortReducer.java

package com.lagou.mr.sort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<SpeakBeanSort, NullWritable, SpeakBeanSort, NullWritable> {
    @Override
    protected void reduce(SpeakBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //为了避免前面compareTo方法导致总流量相等被当成对象相等,而合并了key,所以遍历values获取每个key(bean对象)
        for (NullWritable value : values) {
            context.write(key, value);
        }
    }
}

SortDriver.java

package com.lagou.mr.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
         /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
         */

        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "SortDriver");

        job.setJarByClass(SortDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(SpeakBeanSort.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(SpeakBeanSort.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job, new Path(""));
        FileOutputFormat.setOutputPath(job, new Path(""));

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

总结

  1. 自定义对象作为Map的key输出时,需要实现WritableComparable接口,排序:重写compareTo()方法,序列化以及反序列化方法
  2. 再次理解reduce()方法的参数;reduce()方法是map输出的kv中key相同的kv中的v组成一个集合调用一次reduce()方法,选择遍历values得到所有的key
  3. 默认reduceTask数量是1个
  4. 对于全局排序需要保证只有一个reduceTask

2 分区排序(默认的分区规则,区内有序)

5.6.2 GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

  1. 需求

    原始数据

    订单id 商品id 成交金额
    Order_0000001 Pdt_01 222.8
    Order_0000001 Pdt_05 25.8
    Order_0000002 Pdt_03 522.8
    Order_0000002 Pdt_04 122.4
    Order_0000002 Pdt_05 722.4
    Order_0000003 Pdt_01 232.8

    需要求出每一个订单中成交金额最大的一笔交易。

  2. 实现思路

    Mapper

    • 读取一行文本数据,切分出每个字段;
    • 订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单id相等再按照金额降序排;
    • map()方法输出kv;key-->bean对象,value-->NullWritable.get();

    Shuffle

    • 指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)

    • 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;

    Reduce

    • 每个reduce()方法写出一组key的第一个

参考代码:

image-20210726201105391

OrderBean

OrderBean.java

package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId; //订单id
    private Double price;   //金额

    public OrderBean() {
    }

    public OrderBean(String orderId, Double price) {
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }


    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }


    /**
     * 指定排序规则,先按照订单id比较在按照金额比较,按照金额降序排序
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        int res = -this.price.compareTo(o.getPrice());
        System.out.println(res);
        return res;
    }

    /**
     * 重写toString方法
     *
     * @return
     */
    @Override
    public String toString() {
        return orderId + '\t' + price;
    }
}

自定义分区器

CustomPartitioner.java

package com.lagou.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
        //订单id相同的数据进入同个分区
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

自定义GroupingComparator

CustomGroupingComparator.java

package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {
    public CustomGroupingComparator() {
        super(OrderBean.class, true);
    }

    //重写其中的compare方法,通过这个方法来让mr接受 orderid相同则两个对象相等的规则,key相等


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //a和b是orderbean对象
        //比较两个对象的orderid
        final OrderBean o1 = (OrderBean) a;
        final OrderBean o2 = (OrderBean) b;
        final int i = o1.getOrderId().compareTo(o2.getOrderId());
        return i; // 0 1 -1
    }
}

Mapper

GroupMapper.java

package com.lagou.mr.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] fields = value.toString().split("\t");
        //订单id与金额封装为一个orderBean对象
        bean.setOrderId(fields[0]);
        bean.setPrice(Double.parseDouble(fields[2]));
        context.write(bean, NullWritable.get());
    }
}

Reducer

GroupReducer.java

package com.lagou.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    //key:reduce方法的key注意是一组相同的kv的第一个key作为传入reduce方法的key,因为我们已经指定了排序的规则
    //按照金额降序排列,则第一个key就是金额最大的交易数据
    //value:一组相同的key的kv对中v的集合

    //对于如何判断key是否相同,自定义对象是需要我们指定一个规则,这个规则通过GroupingComparator来指定

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //直接输出key就是金额最大的交易
        context.write(key, NullWritable.get());
    }
}

Driver

GroupDriver.java

package com.lagou.mr.group;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
        */

        //1. 获取配置文件对象,获取job对象实例
        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "GroupDriver");

        //2. 指定程序jar的本地路径
        job.setJarByClass(GroupDriver.class);

        //3. 指定Mapper/Reducer类
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);

        //4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //指定分区器
        job.setPartitionerClass(CustomPartitioner.class);
        //指定使用groupingcomparator
        job.setGroupingComparatorClass(CustomGroupingComparator.class);

        //6. 指定job处理的原始数据路径
        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\GroupingComparator"));

        //7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\group\\out"));

        //指定reducetask的数量,不要使用默认的一个,分区效果不明显
        job.setNumReduceTasks(2);

        //8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

image-20210726201011999

image-20210726200448074

image-20210726201028296

5.7 MapReduce Join实战

5.7.1 MR reduce端join

1.1 需求分析

需求:

投递行为数据表deliver_info:

userId positionId date
1001 177725422 2020-01-03
1002 177725422 2020-01-04
1003 177725433 2020-01-03

职位表position:

id positionName
177725422 产品经理
177725433 大数据开发工程师

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算

1.2 代码实现

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

Bean

DeliverBean.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DeliverBean implements Writable {
    private String userId;
    private String positionId;
    private String date;
    private String positionName;
    //判断是投递数据还是职位数据标识
    private String flag;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPositionId() {
        return positionId;
    }

    public void setPositionId(String positionId) {
        this.positionId = positionId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPositionName() {
        return positionName;
    }

    public void setPositionName(String positionName) {
        this.positionName = positionName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(positionId);
        dataOutput.writeUTF(date);
        dataOutput.writeUTF(positionName);
        dataOutput.writeUTF(flag);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.positionId = dataInput.readUTF();
        this.date = dataInput.readUTF();
        this.positionName = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return userId + '\t' +
                positionId + '\t' +
                date + '\t' +
                positionName + '\t' +
                flag;
    }
}
Mapper

ReduceJoinMapper.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/*
输出kv类型:
    k: positionId
    v: deliverBean
 */
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, DeliverBean> {
    String name = "";
    Text k = new Text();
    //读取的是投递行为数据
    DeliverBean bean = new DeliverBean();

    /**
     * map任务启动时初始化执行一次
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit inputSplit = context.getInputSplit();
        FileSplit split = (FileSplit) inputSplit;
        name = split.getPath().getName();
        System.out.println("context name: " + name);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] arr = line.split("\t");

        if (name.startsWith("deliver_info")) {
            //读取的是投递行为数据表
            bean.setUserId(arr[0]);
            bean.setPositionId(arr[1]);
            bean.setDate(arr[2]);
            bean.setPositionName("");
            bean.setFlag("deliver");
        } else {
            //读取的是职位表
            bean.setUserId("");
            bean.setPositionId(arr[0]);
            bean.setDate("");
            bean.setPositionName(arr[1]);
            bean.setFlag("position");
        }
        k.set(bean.getPositionId());
        context.write(k, bean);
    }
}
Reducer

ReduceJoinReducer.java

package com.lagou.mr.reduce_join;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class ReduceJoinReducer extends Reducer<Text, DeliverBean, DeliverBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<DeliverBean> values, Context context) throws IOException, InterruptedException {
        ArrayList<DeliverBean> deBeans = new ArrayList<>();//相同position的bean对象放在一起(1个职位数据,n个投递行为数据)
        DeliverBean positionBean = new DeliverBean();
        for (DeliverBean bean : values) {
            String flag = bean.getFlag();
            if (flag.equalsIgnoreCase("deliver")) { //投递行为数据
                //此处不能直接把bean对象添加到debeans中,需要将其深度拷贝到newBean中才行
                DeliverBean newBean = new DeliverBean();
                try {
                    BeanUtils.copyProperties(newBean, bean);
                    deBeans.add(newBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    BeanUtils.copyProperties(positionBean, bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        for (DeliverBean bean : deBeans) {
            bean.setPositionName(positionBean.getPositionName());
            context.write(bean, NullWritable.get());
        }
    }
}
Driver

ReduceJoinDriver.java

package com.lagou.mr.reduce_join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final Configuration configuration = new Configuration();
        final Job job = Job.getInstance(configuration, "ReduceJoinDriver");

        job.setJarByClass(ReduceJoinDriver.class);
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DeliverBean.class);

        job.setOutputKeyClass(DeliverBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\ReduceJoin"));
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\ReduceJoin\\out"));

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

image-20210727221405175

image-20210727221428042

image-20210727221507250

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式

5.7.2 MR map端join

2.1 需求分析

适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

2.2 代码实现

DeliverBean

DeliverBean.java

package com.lagou.mr.map_join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DeliverBean implements Writable {
    private String userId;
    private String positionId;
    private String date;
    private String positionName;
    //判断是投递数据还是职位数据标识
    private String flag;

    public DeliverBean() {
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPositionId() {
        return positionId;
    }

    public void setPositionId(String positionId) {
        this.positionId = positionId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPositionName() {
        return positionName;
    }

    public void setPositionName(String positionName) {
        this.positionName = positionName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 序列化
     *
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(positionId);
        dataOutput.writeUTF(date);
        dataOutput.writeUTF(positionName);
        dataOutput.writeUTF(flag);
    }

    /**
     * 反序列化
     *
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.positionId = dataInput.readUTF();
        this.date = dataInput.readUTF();
        this.positionName = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return userId + '\t' +
                positionId + '\t' +
                date + '\t' +
                positionName + '\t' +
                flag;
    }
}
Mapper

MapJoinMapper.java

package com.lagou.mr.map_join;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

/*
使用map端join完成投递行为与与职位数据的关联
 map端缓存所有的 职位数据(关联表)
 map方法读取的文件数据是 投递行为数据(主表)
 基于投递行为数据的positionId去缓存中查询出positionName,输出即可
 这个job中无需reducetask,setnumreducetask为0
*/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> hashMap = new HashMap<>();
    Text k = new Text();

    /**
     * 加载职位数据
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //读取缓存文件
        InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream("position.txt"), "UTF-8");
        BufferedReader reader = new BufferedReader(inputStreamReader);

        //读取职位数据解析为kv类型(hashmap): k -- positionId, value -- positionName
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] fields = line.split("\t");
            hashMap.put(fields[0], fields[1]);
        }
        reader.lines();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] arr = line.split("\t");
        String positionName = hashMap.get(arr[1]);
        k.set(line + "\t" + positionName);
        context.write(k, NullWritable.get());
    }
}
Driver

MapJoinDriver.java

package com.lagou.mr.map_join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {

        final Configuration conf = new Configuration();
        final Job job = Job.getInstance(conf, "MapJoinDriver");

        job.setJarByClass(MapJoinDriver.class);
        job.setMapperClass(MapJoinMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\input\\ReduceJoin\\deliver_info.txt")); //指定读取数据的原始路径
        FileOutputFormat.setOutputPath(job, new Path("H:\\hadoop\\learningCode\\mapreduce\\wordcount\\output\\MapJoin\\out")); //指定结果数据输出路径

        //设置加载缓存文件
        job.addCacheFile(new URI("file:///H:/hadoop/learningCode/mapreduce/wordcount/input/ReduceJoin/position.txt"));

        //设置reducetask数量为0
        job.setNumReduceTasks(0);

        final boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);//jvm退出:正常退出0,非0值则是错误退出
    }
}

5.7.3 数据倾斜解决方案

5.8 MapReduce读取和输出数据

5.8.1 InputFormat

运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
InputFormat是MapReduce框架用来读取数据的类。

InputFormat常见子类包括:

  1. CombineTextInputFormat案例

MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论大小,都是单独一个切片,
然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个
MapTask处理的数据量很小,大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用
率不高。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切
片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。

需求:将输入数据中的多个小文件合并为一个切片处理。
准备:运行WordCount案例,准备多个小文件
具体使用方式:

//如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4MB
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

随后验证切片数量的变化。

CombineTextInputFormat切片原理:

标签:org,编程,hadoop,MapReduce,job,import,apache,103,public
来源: https://www.cnblogs.com/haitaoli/p/15114487.html