其他分享
首页 > 其他分享> > Mapreduce最定义groupComparator实现分组求取topN和其他的参数以及调优

Mapreduce最定义groupComparator实现分组求取topN和其他的参数以及调优

作者:互联网

groupingComparator实现分组求取topN

求Top1:

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑

需求如下: 求下列每一个订单中 交易金额最大的一个
在这里插入图片描述
分析:

将订单号和金额合并为OrderBean作为key,在map阶段按照key分区
在reduce阶段利用groupingComparator来聚合成组,取第一个值即为最大值

定义OderBean

 import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;     //定义订单号
    private Double price;    //定义价格必须使用Double包装类型


    /**
     * 按照价格进行排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        //需要先比较我们的订单id,如果订单id相同的,我们再按照金额进行排序
        //如果订单id不相同,没有可比性
        int result = this.orderId.compareTo(o.orderId);
        if(result ==0){                                                                                          
            //如果订单id相同,继续比较价格,按照价格进行排序,
            //如果订单id不相同没有可比性
            //默认降序
            result = this.price.compareTo(o.price);
            return -result;
        }
        return result;

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId= in.readUTF();
        this.price = in.readDouble();

    }


    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }


    @Override
    public String toString() {
        return orderId+"\t"+price;
    }
}

定义分区

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class GroupPartition extends Partitioner<OrderBean,NullWritable> {


    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;

       
    }
}

定义MyGroupComparator:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupComparator extends WritableComparator {


    /**
     * 注意调用compare方法的时候的参数,参数必须是WritableComparable类型,这个类型才可以进行比较
     * @param a
     * @param b
     * @return
     */


    public MyGroupComparator() {
        super(OrderBean.class,true);                //无参构造器
    }                                                                 
    //将我们自定义的OrderBean注册到我们自定义的MyGroupIngCompactor当中来
    //表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
    //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;             //类型强制转化
        OrderBean second = (OrderBean) b;
        return first.getOrderId().compareTo(second.getOrderId());        //根据订单号来进行对比
    }
}

定义Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");         //数据切割
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.valueOf(split[2]));
        context.write(orderBean,NullWritable.get());

    }
}

定义Reducer

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());    //利用context写出去
    }
}

定义main:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GroupMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "group");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\自定义groupingComparator\\input"));

        job.setMapperClass(GroupMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setPartitionerClass(GroupPartition.class);

        //设置我们自定义的分组类
        job.setGroupingComparatorClass(MyGroupComparator.class);

        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job,new Path("file:///F:\\自定义groupingComparator\\output_top1"));


        boolean b = job.waitForCompletion(true);


        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
        System.exit(run);
    }
}

输出结果:
在这里插入图片描述
求Top 2…N:

修改Mapper:

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,DoubleWritable> {

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String[] split = value.toString().split("\t");
       OrderBean orderBean = new OrderBean();
       orderBean.setOrderId(split[0]);
       orderBean.setPrice(Double.valueOf(split[2]));
       DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2]));   //
       context.write(orderBean,doubleWritable);

   }
}

修改reducer:

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean,DoubleWritable,OrderBean,DoubleWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
        //context.write(key,NullWritable.get());
        int i = 0;
        for (DoubleWritable value : values) {
            i++;
            if(i <= 2){
                context.write(key,value);
            }else{
                break;
            }

        }


    }
}

输出结果:在这里插入图片描述

其他参数以及调优

分区和分组的区别:在这里插入图片描述

mapreduce当中的推测执行:一般直接关掉推测执行
如果推测执行打开了,那么如果一个maptask执行任务时间比较长,那么久再启动一个另外相同的maptask去执行同样的任务,
谁先执行完,就采用谁的执行结果
一个任务为什么会长时间的没有执行完成——有可能是因为数据的倾斜

只会造成集群资源的更加的紧张

一般直接关闭推测执行

yarn的资源调度管理:
yarn是我们hadoop2.x当中引进的一个新的模块,主要用于管理我们集群当中的资源
比如说:内存,cpu
yarn不光管理硬件资源,还管理运行的一些任务信息等等
yarn的调度可以分为两个层级来说
一级管理调度:
管理计算机的资源
运行的job任务的生命周
二级管理调度:
任务的计算模型
多样化的计算模型 storm spark

yarn集群当中各个组件的作用:
resourceManager:主节点,主要用于接收用户的请求,分配资源
nodeManager:从节点,主要用于处理任务的计算
ApplicationMaster:每提交一个任务,启动一个appmaster,
这个appmaster全权负责管理我们的任务的执行
主要职责:申请资源
分配资源(分配container)
监控任务执行的进度状况
回收资源
与resourceManager通信,报告任务的执行状况
自杀,
Container:资源分配的单位,所有的资源都是以container的形式来进行划分的,便于资源的分配和回收
jobHistory:历史完成的任务的日志信息
TimeLineServer: 2.4版本以后出来的新特性,查看正在执行的任务的信息

yarn当中的调度器:
调度器主要解决的是任务先后提交,如何保证任务最快执行的一种策略
研究的是任务之间如何一起执行的问题

队列 栈

队列是两端都开口 就跟我们火车头进隧道一样
栈 一端开口,一端封闭 先进去的后出去 弹夹类似

hadoop当中的调度器主要有三种
1:fifo 队列调度器,first in first out 没人用
第一个任务来了,先执行,第二个任务来了,等着
如果一个很大的计算任务先来,需要执行两个小时,再来一个小任务,需要执行两分钟

第二种:capacity scheduler 容量调度器 apache的hadoop版本默认使用的
容量调度器:将我们集群的资源,划分成好几个队列
30% 40% 30%
3 4 3
任务提交的时候,可以选择不同的队列来进行提交
根据提交的任务需要的资源大小不同,可以将我们的任务,划分到不同的队列下面去

第三种:fairScheduler 公平调度器 CDH版本的hadoop默认的调度规则
如果没有任务提交,第一个任务过来,将集群当中的所有的资源全部给第一个任务
第二个任务来了,将第一个任务的资源划分一点出来给第二个任务,保证第二个任务也可以窒息感
保证每一个任务都可以公平的一起执行

一般调度器不会去更改

标签:hadoop,Mapreduce,topN,调优,io,org,apache,import,public
来源: https://blog.csdn.net/KujyouRuri/article/details/115096913