Hadoop之MapReduce的OutputFormat解析
作者:互联网
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
OutputFormat常用的实现类TextOutputFormat和SequenceFileOutputFormat
1、TextOutputFormat(文本输出)
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。键和值可以是任意类型,TextOutputFormat调用toString()方法转换为字符串。
2、SequenceFileOutputFormat
格式紧凑,容易被压缩
3、自定义OutputFormat
(1)使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat
比如需要根据数据的不同输出两类结果到不同的目录中,此时可以使用自定义OutputFormat
(2)自定义OutputFormat步骤
1)自定义类继承FileOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new FilterRecordWriter(context); } }
2)改写RecordWriter,具体改写输出数据的方法write()
public class FilterRecordWriter extends RecordWriter<Text,NullWritable>{ private FSDataOutputStream hadoopOutputStream=null; private FSDataOutputStream otherOutputStream=null; @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(hadoopOutputStream); IOUtils.closeStream(otherOutputStream); } @Override public void write(Text text, NullWritable writable) throws IOException, InterruptedException { if(text.toString().contains("www.123.com")){ hadoopOutputStream.write(text.toString().getBytes()); }else{ otherOutputStream.write(text.toString().getBytes()); } } public FilterRecordWriter(TaskAttemptContext context) { FileSystem fileSystem=null; try { //获取文件系统 fileSystem = FileSystem.get(context.getConfiguration()); //创建输出文件路径 Path hadoopPath = new Path("/mapreduce/outputFormat/output/123.log"); Path otherPath = new Path("/mapreduce/outputFormat/output/other.log"); hadoopOutputStream=fileSystem.create(hadoopPath); otherOutputStream=fileSystem.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } }
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); context.write(new Text(line), NullWritable.get()); } }
public class FilterReduce extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text text, Iterable<NullWritable> iterable, Context context) throws IOException, InterruptedException { //防止text重复被过滤掉 for(NullWritable nullWritable:iterable){ context.write(new Text(text.toString()+"\r\n"), NullWritable.get()); } } }
public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "root"); Configuration configuration=new Configuration(); Job job = Job.getInstance(configuration); job.setOutputFormatClass(FilterOutputFormat.class); job.setMapperClass(FilterMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(FilterReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/mapreduce/outputFormat/log")); FileOutputFormat.setOutputPath(job, new Path("/mapreduce/outputFormat/output")); boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion==true?0:1); }
zuodaoyong 发布了63 篇原创文章 · 获赞 2 · 访问量 2728 私信 关注
标签:OutputFormat,Hadoop,public,job,MapReduce,context,new,class 来源: https://blog.csdn.net/zuodaoyong/article/details/104112949