其他分享
首页 > 其他分享> > 序列化

序列化

作者:互联网

Hadoop的key和value的传递序列化需要涉及两个重要的接口Writable和WritableComparable: 

WritableComparable比Writable多了一个compareTo方法,用来判断key是否唯一或者说是不是相同。

Hadoop为Key的数据类型必须实现WritableComparable,而Value的数据类型只需要实现Writable即可,能用做Key值的一定可以用做Value值,但是能做Value值的未必能用来做Key值。

 

package com.atguigu.writableComparable;

/*
对序列化之后的总流量排序
 */
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class sortDriver {
    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration());


        job.setJarByClass(sortDriver.class);

        job.setMapperClass(sortMapper.class);
        job.setReducerClass(sortReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.addInputPath(job,new Path("E:\\phone2.txt"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\out3"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

}


/*
输入:
13610009496,300,200,100,
13710009496,200,400,300,
13800094960,200,100,200,
13810009496,300,100,500,
13910009496,600,500,400,
15210009496,300,500,100,
输出:
13810009496    upFlow=300, downFlow=100, sumFlow=500
13910009496    upFlow=600, downFlow=500, sumFlow=400
13710009496    upFlow=200, downFlow=400, sumFlow=300
13800094960    upFlow=200, downFlow=100, sumFlow=200
15210009496    upFlow=300, downFlow=500, sumFlow=100
13610009496    upFlow=300, downFlow=200, sumFlow=100
 */

 

package com.atguigu.writableComparable;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    @Override
    public String toString(){
        return "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow ;
    }

    public void set(long upFlow,long downFlow){
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        this.sumFlow=upFlow+downFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    //序列化:把数据交给框架
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);


    }
    //反序列化:从框架读取数据
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();


    }

    @Override
    public int compareTo(FlowBean o) {
        return Long.compare(o.sumFlow,this.sumFlow);
    }
}
package com.atguigu.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class sortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {

    private FlowBean flow = new FlowBean();
    private Text phone = new Text();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(" ");

        phone.set(fields[0]);
        flow.setUpFlow(Long.parseLong(fields[1]));
        flow.setDownFlow(Long.parseLong(fields[2]));
        flow.setSumFlow(Long.parseLong(fields[3]));

        context.write(flow,phone);

    }
}
package com.atguigu.writableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class sortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {


    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for(Text value:values){
            context.write(value,key);
        }
    }
}

 

标签:downFlow,upFlow,sumFlow,apache,import,序列化,public
来源: https://www.cnblogs.com/hapyygril/p/13995137.html