其他分享
首页 > 其他分享> > Writable接口与序列化机制

Writable接口与序列化机制

作者:互联网

序列化概念

Hadoop序列化的特点

Hadoop序列化的作用

Writable接口

Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.MR的任意Key和Value必须实现Writable接口.

MR的任意key必须实现WritableComparable接口

常用的Writable实现类

Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。

例:

  Text test = new Text("test");
IntWritable one = new IntWritable(1);

自定义Writable类

MapReduce输入的处理类

InputFormat类的层次结构

其他输入类

自定义输入格式

Hadoop的输出

案例实现:

数据

        136315798506613726230503248124681200
      1363157995052138265441012640200
      1363157991076139264356561321512200
      1363154400022139262511062400200
      13631579930441821157596115272106200
      13631579950748413841341161432200
      1363157993055135604396581116954200
      13631579950331592013325731562936200
      1363157983019137191994192400200
      1363157984041136605779916960690200
      13631579730981501368585836593538200
      1363157986029159890021191938180200
      1363157992093135604396589184938200
      136315798604113480253104180180200
      13631579840401360284656519382910200

      13726230503248124681sum

DataBean类

     import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    public class DataBean implements Writable{

      //电话号码
      private String phone;
      //上行流量
      private Long upPayLoad;
      //下行流量
      private Long downPayLoad;
      //总流量
      private Long totalPayLoad;

      public DataBean(){}

      public DataBean(String phone,Long upPayLoad, Long downPayLoad) {
          super();
          this.phone=phone;
          this.upPayLoad = upPayLoad;
          this.downPayLoad = downPayLoad;
          this.totalPayLoad=upPayLoad+downPayLoad;
      }

      /**
        * 序列化
        * 注意:序列化和反序列化的顺序和类型必须一致
        */
      @Override
      public void write(DataOutput out) throws IOException {
          // TODO Auto-generated method stub
          out.writeUTF(phone);
          out.writeLong(upPayLoad);
          out.writeLong(downPayLoad);
          out.writeLong(totalPayLoad);
      }

      /**
        * 反序列化
        */
      @Override
      public void readFields(DataInput in) throws IOException {
          // TODO Auto-generated method stub
          this.phone=in.readUTF();
          this.upPayLoad=in.readLong();
          this.downPayLoad=in.readLong();
          this.totalPayLoad=in.readLong();
      }

      @Override
      public String toString() {
          return upPayLoad +"\t"+ downPayLoad +"\t"+ totalPayLoad;
      }

      public String getPhone() {
          return phone;
      }

      public void setPhone(String phone) {
          this.phone = phone;
      }

      public Long getUpPayLoad() {
          return upPayLoad;
      }

      public void setUpPayLoad(Long upPayLoad) {
          this.upPayLoad = upPayLoad;
      }

      public Long getDownPayLoad() {
          return downPayLoad;
      }

      public void setDownPayLoad(Long downPayLoad) {
          this.downPayLoad = downPayLoad;
      }

      public Long getTotalPayLoad() {
          return totalPayLoad;
      }

      public void setTotalPayLoad(Long totalPayLoad) {
          this.totalPayLoad = totalPayLoad;
      }
    }

DataCount类

     import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class DataCount {

      public static void main(String[] args) throws IOException, ClassNotFoundException,
      InterruptedException {
          // TODO Auto-generated method stub
          Job job=Job.getInstance(new Configuration());

          job.setJarByClass(DataCount.class);

          job.setMapperClass(DataCountMapper.class);
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(DataBean.class);
          FileInputFormat.setInputPaths(job, args[0]);

          job.setReducerClass(DataCountReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(DataBean.class);
          FileOutputFormat.setOutputPath(job, new Path(args[1]));

          job.waitForCompletion(true);
      }
        public static class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean>{

          @Override
          protected void map(LongWritable key, Text value,
          Mapper<LongWritable, Text, Text, DataBean>.Context context)
                throws IOException, InterruptedException {
            String hang=value.toString();
            String[] strings=hang.split("\t");
            String phone=strings[1];
            long up=Long.parseLong(strings[2]);
            long down=Long.parseLong(strings[3]);
            DataBean dataBean=new DataBean(phone,up, down);

            context.write(new Text(phone), dataBean);
          }

      }
        public static class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean>{

          @Override
          protected void reduce(Text k2, Iterable<DataBean> v2,
          Reducer<Text, DataBean, Text, DataBean>.Context context)
                throws IOException, InterruptedException {
            long upSum=0;
            long downSum=0;

            for(DataBean dataBean:v2){
                upSum += dataBean.getUpPayLoad();
                downSum += dataBean.getDownPayLoad();
            }

            DataBean dataBean=new DataBean(k2.toString(),upSum,downSum);

            context.write(new Text(k2), dataBean);
          }

      }
    }

 



标签:downPayLoad,Long,Writable,phone,接口,import,序列化,public
来源: https://www.cnblogs.com/TiePiHeTao/p/d9ef64c911fae231a651f0653fb9635f.html