其他分享
首页 > 其他分享> > 6.Partitioner案例

6.Partitioner案例

作者:互联网

6.Partitioner案例

文件:https://files-cdn.cnblogs.com/files/handsomeplus/phonedata.zip

程序源码

自定义Bean类

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Bean implements Writable {
    private long upflow;
    private long downflow;
    private long sumflow;
    public Bean() {
    }
    public Bean(long upflow, long downflow) {
        this.upflow = upflow;
        this.downflow = downflow;
        this.sumflow = upflow+downflow;
    }

   /* public void set(long upflow,long downflow){
        this.upflow=upflow;
        this.downflow=downflow;
        this.sumflow=upflow+downflow;
    }*/

    public long getUpflow() {
        return upflow;
    }

    public void setUpflow(long upflow) {
        this.upflow = upflow;
    }

    public long getDownflow() {
        return downflow;
    }

    public void setDownflow(long downflow) {
        this.downflow = downflow;
    }

    public long getSumflow() {
        return sumflow;
    }

    public void setSumflow(long sumflow) {
        this.sumflow = sumflow;
    }

    @Override
    public String toString() {
        return upflow+"\t"+downflow+"\t"+sumflow;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upflow);
        dataOutput.writeLong(downflow);
        dataOutput.writeLong(sumflow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upflow=dataInput.readLong();
        downflow=dataInput.readLong();
        sumflow=dataInput.readLong();
    }
}

Map类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class Map extends Mapper<Object, Text,Text,Bean> {
    private String phonenumber="";
    private long upflow=0;
    private long downflow=0;
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] array = value.toString().trim().split("\t");
        phonenumber=array[1];
        upflow=Long.parseLong(array[array.length-3]);
        downflow=Long.parseLong(array[array.length-2]);
        context.write(new Text(phonenumber),new Bean(upflow,downflow));
    }
}

Reduce类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Reduce extends Reducer<Text,Bean,Text,Bean> {
    @Override
    protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
        long sumupflow=0;
        long sumdownflow=0;

        for (Bean value : values) {
            sumupflow+= value.getUpflow();
            sumdownflow+=value.getDownflow();
        }
        context.write(key,new Bean(sumupflow,sumdownflow));
    }
}

Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Driver {
    public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration,"phonedata");
        job.setJarByClass(Driver.class);
        job.setMapperClass(Map.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Bean.class);

        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Bean.class);
        
        job.setPartitionerClass(Part.class);
        job.setNumReduceTasks(5);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

Partitioner分区

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class Part extends Partitioner<Text,Bean> {
    Integer part=4;
    @Override
    public int getPartition(Text text, Bean bean, int i) {
        String num = text.toString().substring(0,3);
        if (num.equals("136")){
            part=0;
        }
        else if (num.equals("137")){
            part=1;
        }
        else if (num.equals("138")){
            part=2;
        }
        else if (num.equals("139")){
            part=3;
        }
        else {part=4;}
        return part;
    }
}

标签:downflow,upflow,long,Partitioner,案例,job,import,public
来源: https://www.cnblogs.com/handsomeplus/p/13758223.html