Mapreduce代码疑点(1)
作者:互联网
一、Hadoop MultipleInputs.addInputPath 读取多个路径
https://blog.csdn.net/t1dmzks/article/details/76473905
MultipleInputs.addInputPath
作用
可以指定多个输入路径,每个路径都可以指定相应的map方法
使用方法
MultipleInputs.addInputPath
(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)
举例
使用wordcount来举例
F:\hadooptest\wordcount\input1下有个word.txt,单词用空格分割
aa bb cc dd ee ff aa bb ff
F:\hadooptest\wordcount\input2下有个word.txt。单词用 ## 分割
aa##bb##cc ee##gg##kk
代码
package com.myhadoop.multiple; import com.myhadoop.mapreduce.test.WordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.StringTokenizer; /** * Created by kaishun on 2017/7/31. */ public class TestMultipleInputs { public static class MapA extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String lines = value.toString(); String strs[] = lines.split("\\s+"); for (int i = 0; i <strs.length ; i++) { word.set(strs[i]); context.write(word, one); } } } public static class MapB extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String lines = value.toString(); String strs[] = lines.split("##"); for (int i = 0; i <strs.length ; i++) { word.set(strs[i]); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("MultipleWordCount"); job.setJarByClass(WordCount.class); //多个输入,分别对应不同的map MultipleInputs.addInputPath(job,new Path("F:\\hadooptest\\wordcount\\input1"),TextInputFormat.class,WordCount.MapA.class); MultipleInputs.addInputPath(job,new Path("F:\\hadooptest\\wordcount\\input2"),TextInputFormat.class,WordCount.MapB.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //分到一个reduce job.setReducerClass(WordCount.Reduce.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
输出
aa 3 bb 3 cc 2 dd 1 ee 2 ff 2 gg 1 kk 1
二、hadoop中的job.setOutputKeyClass与job.setMapOutputKeyClass
mr程序中一般都会有hadoop中的job.setOutputKeyClass(theClass)与job.setOutputValueClass(theClass),
但是有的程序处理以上两个外还有job.setMapOutputKeyClass(theClass)与job.setMapOu
tputValueClass(Text.class),一直没弄懂是怎么回事,网上查了下,原来当mapper与reducer
的输出类型一致时可以用 job.setOutputKeyClass(theClass)与job.setOutputValueClass
(theClass)这两个进行配置就行,但是当mapper用于reducer两个的输出类型不一致的时候就需
要分别进行配置了。
标签:疑点,代码,hadoop,Mapreduce,job,org,apache,import,class 来源: https://www.cnblogs.com/Lee-yl/p/10996374.html