序列化
作者:互联网
Hadoop的key和value的传递序列化需要涉及两个重要的接口Writable和WritableComparable:
WritableComparable比Writable多了一个compareTo方法,用来判断key是否唯一或者说是不是相同。
Hadoop为Key的数据类型必须实现WritableComparable,而Value的数据类型只需要实现Writable即可,能用做Key值的一定可以用做Value值,但是能做Value值的未必能用来做Key值。
package com.atguigu.writableComparable; /* 对序列化之后的总流量排序 */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class sortDriver { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(sortDriver.class); job.setMapperClass(sortMapper.class); job.setReducerClass(sortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.addInputPath(job,new Path("E:\\phone2.txt")); FileOutputFormat.setOutputPath(job,new Path("E:\\out3")); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } } /* 输入: 13610009496,300,200,100, 13710009496,200,400,300, 13800094960,200,100,200, 13810009496,300,100,500, 13910009496,600,500,400, 15210009496,300,500,100, 输出: 13810009496 upFlow=300, downFlow=100, sumFlow=500 13910009496 upFlow=600, downFlow=500, sumFlow=400 13710009496 upFlow=200, downFlow=400, sumFlow=300 13800094960 upFlow=200, downFlow=100, sumFlow=200 15210009496 upFlow=300, downFlow=500, sumFlow=100 13610009496 upFlow=300, downFlow=200, sumFlow=100 */
package com.atguigu.writableComparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; @Override public String toString(){ return "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow ; } public void set(long upFlow,long downFlow){ this.upFlow=upFlow; this.downFlow=downFlow; this.sumFlow=upFlow+downFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public long getDownFlow() { return downFlow; } public long getSumFlow() { return sumFlow; } //序列化:把数据交给框架 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化:从框架读取数据 public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public int compareTo(FlowBean o) { return Long.compare(o.sumFlow,this.sumFlow); } }
package com.atguigu.writableComparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class sortMapper extends Mapper<LongWritable,Text,FlowBean,Text> { private FlowBean flow = new FlowBean(); private Text phone = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(" "); phone.set(fields[0]); flow.setUpFlow(Long.parseLong(fields[1])); flow.setDownFlow(Long.parseLong(fields[2])); flow.setSumFlow(Long.parseLong(fields[3])); context.write(flow,phone); } }
package com.atguigu.writableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class sortReducer extends Reducer<FlowBean, Text,Text,FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text value:values){ context.write(value,key); } } }
标签:downFlow,upFlow,sumFlow,apache,import,序列化,public 来源: https://www.cnblogs.com/hapyygril/p/13995137.html