其他分享
首页 > 其他分享> > MapperReduce中两个不同文件夹下读取数据并连接

MapperReduce中两个不同文件夹下读取数据并连接

作者:互联网

MapperReduce中两个不同文件夹下读取数据并连接

前言

今天是20200202,我觉得是疫情的拐点,是一个编程的好日子!过去不学技术的一年,可能要白给,不管怎么说,好歹我也认识到马克思哲学理论的确实是科学的,矫情的话能写一大堆,好想聊聊天啊。不管了干正事,第一次写这个笔记,因为好多东西忘记的太快了,也是受广学楼某人的耳闻目染,当个笔记本记录一下,不知道CSDN能不能写个人编程日记,写错了大家指出来哈,好多不会,太多要学!

问题抛出

  1. 在MapperReduce中很常见的需求就是对表做链接操作,当然hive很方便,那怎么使用MR实现
  2. 首先两个表,在这里是来自HDFS上两个不同文件夹下的文件,所以要解决如何区分是来自两个文件夹中那个文件夹下的数据
  3. 最后怎么在Reduce端如何做链接操作

问题解决

  1. 在Reduce端都是通过打的不同标签来区分数据是来自那个文件夹的
  2. 在Mapper中打标签,有两种方案第一种是,写两个Mapper类,第二个是通过切片判断文件路径,我觉得第一个方便一点,注意如果是两个Mapper类,如果输出的K,V数据格式不统一可能会报错,建议都是用Text,反正可以格式转换
  3. 连接操作,首先将数据通过标签放在两个集合里,然后在通过两次遍历做连接

使用两个Mapper类

案例来自2019安徽省大数据与人工智能竞赛
将输出结果文件中的城市编号用cityid.txt文件中城市名称替换

结果文件test2中的数据,需要将cityid字段中的编号替换为下图中的汉字
需要将cityid字段中的编号替换为下图中的汉字
cityid.txt中的文件内容部分数据格式如下

1701|桐城市|桐城市|安徽|中国|安庆市|华东地区|四线城市|31.05228|116.93861
1702|宿松县|宿松县|安徽|中国|安庆市|华东地区|四线城市|30.151213|116.1142
1703|枞阳县|枞阳县|安徽|中国|安庆市|华东地区|四线城市|30.69371|117.21059
1704|太湖县|太湖县|安徽|中国|安庆市|华东地区|四线城市|30.420059|116.26508
1705|怀宁县|怀宁县|安徽|中国|安庆市|华东地区|四线城市|30.409006|116.64709
1706|岳西县|岳西县|安徽|中国|安庆市|华东地区|四线城市|30.857161|116.35818
1707|望江县|望江县|安徽|中国|安庆市|华东地区|四线城市|30.123537|116.67433
1708|潜山县|潜山县|安徽|中国|安庆市|华东地区|四线城市|30.630346|116.5672
5317|迎江区|迎江区|安徽|中国|安庆市|华东地区|四线城市|30.511548|117.09115
5318|大观区|大观区|安徽|中国|安庆市|华东地区|四线城市|30.553957|117.02167
1691|怀远县|怀远县|安徽|中国|蚌埠市|华东地区|四线城市|32.95665|117.19356
1692|固镇县|固镇县|安徽|中国|蚌埠市|华东地区|四线城市|33.314575|117.31171
1693|五河县|五河县|安徽|中国|蚌埠市|华东地区|四线城市|33.139736|117.88253
1738|和县|和县|安徽|中国|巢湖市|华东地区|null|31.714224|118.36112
1739|含山县|含山县|安徽|中国|巢湖市|华东地区|null|31.720116|118.103
1740|庐江县|庐江县|安徽|中国|巢湖市|华东地区|null|31.253363|117.28835
1741|无为县|无为县|安徽|中国|巢湖市|华东地区|null|31.298515|117.91132

直接上代码了,建议用一个文件写,不要分三个,用静态类写
1. Mapper类

//第一个mapper类,数据打上标签“a_”
public static class doMapper1 extends Mapper<LongWritable, Text, Text, Text> {
		private final static String a_label = "a_";

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString().trim();
			String[] splits = line.split(" ");
			String[] v = null;
			// 字段为5个处理,否则是脏数据过滤掉
			if (splits.length == 5) {
				// 取出cityid
				v = splits[4].split(":");
				// 城市id作为key
				context.write(new Text(v[1]), new Text(a_label + line));
			}
		}
	}
//第二个mapper类给数据打上标签b_“”	
public static class doMapper2 extends Mapper<LongWritable, Text, Text, Text> {
		private final static String b_label = "b_";

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] splits = line.split("\\|");
			String newValue = "";
			// 过滤不符合数据
			if (splits.length == 10) {
				// 构造城市名称,国家,省份,县市,地区以逗号隔开
				if (splits[4] != null)
					newValue += splits[4];
				if (splits[3] != null)
					newValue += "," + splits[3];
				if (splits[5] != null)
					newValue += "," + splits[5];
				if (splits[2] != null)
					newValue += "," + splits[2];
				context.write(new Text(splits[0]), new Text(b_label + newValue));
			}

		}
	}

  1. 在Reduce中区分,并连接,用startsWith(“a_”)方法判断,如果是以“a_”开头加入集合vecA,如果是以“b_”开头加入vecB,对了我写的字符串拼接有点孬了,最好使用stringbuild来写,不然会产生垃圾,反正是要序列化的,如果是放在集合等其他容器里,要注意会有坑。
public static class doReducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String s1 = null;
			String s2 = null;
			// 创建两个集合
			Vector<String> vecA = new Vector<String>();
			Vector<String> vecB = new Vector<String>();
			// 判断两个输入数据的value,分别加入上面两个集合

			for (Text value : values) {
				String line = value.toString().trim();
				// 判断开头标记
				if (line.startsWith("a_")) {
					String[] v = line.substring(2).split(" ");
					// 去掉cityid字段
					s1 = v[0] + " " + v[1] + " " + v[2] + " " + v[3];
					vecA.add(s1);
				}
				if (line.startsWith("b_")) {
					s2 = line.substring(2);
					vecB.add(s2);
				}
			}
			// 将两个集合 拼接,笛卡尔积
			for (String v1 : vecA) {
				for (String v2 : vecB) {
					context.write(new Text(v1 + " " + "cityid:" + v2), new Text(""));
				}

			}
		}
	}
}

  1. 最后注意主类的写法MultipleInputs.addInputPath()方法
public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "test3");
		job.setJarByClass(test3.class);
		FileSystem fs = FileSystem.get(conf);
		Path outputDir = new Path("/output/bigdata/test3");
		if (fs.exists(outputDir)) {
			fs.delete(outputDir, true);// true表示递归删除
		}
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileOutputFormat.setOutputPath(job, outputDir);

		// 第一个文件来自test2的输出结果
		MultipleInputs.addInputPath(job, new Path("/output/bigdata/test2/"), TextInputFormat.class, doMapper1.class);

		// 第二个文件来自 cityid.txt
		MultipleInputs.addInputPath(job, new Path("/input/bigdata/cityid"), TextInputFormat.class, doMapper2.class);

		job.setReducerClass(doReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

结果如下

在这里插入图片描述
分片的方法Mapper端通过一下两行代码:
拿到路径的字符串,通过该字符串不同打上不同标签

// 获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取路径信息
String path = inputSplit.getPath().toString();

相应主类写法变动加入以下两行代码:

// 输入两个路径
FileInputFormat.addInputPath(job, new Path("/input/dianxin/tl_hefei/dianxin_data"));
FileInputFormat.addInputPath(job, new Path("/input/dianxin/city_id/city_id.txt"));

麋鹿的调包霞 发布了1 篇原创文章 · 获赞 0 · 访问量 39 私信 关注

标签:MapperReduce,splits,华东地区,Text,String,文件夹,new,class,读取数据
来源: https://blog.csdn.net/c_1234qwer/article/details/104146281