MapperReduce中两个不同文件夹下读取数据并连接
作者:互联网
MapperReduce中两个不同文件夹下读取数据并连接
前言
今天是20200202,我觉得是疫情的拐点,是一个编程的好日子!过去不学技术的一年,可能要白给,不管怎么说,好歹我也认识到马克思哲学理论的确实是科学的,矫情的话能写一大堆,好想聊聊天啊。不管了干正事,第一次写这个笔记,因为好多东西忘记的太快了,也是受广学楼某人的耳闻目染,当个笔记本记录一下,不知道CSDN能不能写个人编程日记,写错了大家指出来哈,好多不会,太多要学!
问题抛出
- 在MapperReduce中很常见的需求就是对表做链接操作,当然hive很方便,那怎么使用MR实现
- 首先两个表,在这里是来自HDFS上两个不同文件夹下的文件,所以要解决如何区分是来自两个文件夹中那个文件夹下的数据
- 最后怎么在Reduce端如何做链接操作
问题解决
- 在Reduce端都是通过打的不同标签来区分数据是来自那个文件夹的
- 在Mapper中打标签,有两种方案第一种是,写两个Mapper类,第二个是通过切片判断文件路径,我觉得第一个方便一点,注意如果是两个Mapper类,如果输出的K,V数据格式不统一可能会报错,建议都是用Text,反正可以格式转换
- 连接操作,首先将数据通过标签放在两个集合里,然后在通过两次遍历做连接
使用两个Mapper类
案例来自2019安徽省大数据与人工智能竞赛
将输出结果文件中的城市编号用cityid.txt文件中城市名称替换
结果文件test2中的数据,需要将cityid字段中的编号替换为下图中的汉字
cityid.txt中的文件内容部分数据格式如下
1701|桐城市|桐城市|安徽|中国|安庆市|华东地区|四线城市|31.05228|116.93861
1702|宿松县|宿松县|安徽|中国|安庆市|华东地区|四线城市|30.151213|116.1142
1703|枞阳县|枞阳县|安徽|中国|安庆市|华东地区|四线城市|30.69371|117.21059
1704|太湖县|太湖县|安徽|中国|安庆市|华东地区|四线城市|30.420059|116.26508
1705|怀宁县|怀宁县|安徽|中国|安庆市|华东地区|四线城市|30.409006|116.64709
1706|岳西县|岳西县|安徽|中国|安庆市|华东地区|四线城市|30.857161|116.35818
1707|望江县|望江县|安徽|中国|安庆市|华东地区|四线城市|30.123537|116.67433
1708|潜山县|潜山县|安徽|中国|安庆市|华东地区|四线城市|30.630346|116.5672
5317|迎江区|迎江区|安徽|中国|安庆市|华东地区|四线城市|30.511548|117.09115
5318|大观区|大观区|安徽|中国|安庆市|华东地区|四线城市|30.553957|117.02167
1691|怀远县|怀远县|安徽|中国|蚌埠市|华东地区|四线城市|32.95665|117.19356
1692|固镇县|固镇县|安徽|中国|蚌埠市|华东地区|四线城市|33.314575|117.31171
1693|五河县|五河县|安徽|中国|蚌埠市|华东地区|四线城市|33.139736|117.88253
1738|和县|和县|安徽|中国|巢湖市|华东地区|null|31.714224|118.36112
1739|含山县|含山县|安徽|中国|巢湖市|华东地区|null|31.720116|118.103
1740|庐江县|庐江县|安徽|中国|巢湖市|华东地区|null|31.253363|117.28835
1741|无为县|无为县|安徽|中国|巢湖市|华东地区|null|31.298515|117.91132
直接上代码了,建议用一个文件写,不要分三个,用静态类写
1. Mapper类
//第一个mapper类,数据打上标签“a_”
public static class doMapper1 extends Mapper<LongWritable, Text, Text, Text> {
private final static String a_label = "a_";
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
String[] splits = line.split(" ");
String[] v = null;
// 字段为5个处理,否则是脏数据过滤掉
if (splits.length == 5) {
// 取出cityid
v = splits[4].split(":");
// 城市id作为key
context.write(new Text(v[1]), new Text(a_label + line));
}
}
}
//第二个mapper类给数据打上标签b_“”
public static class doMapper2 extends Mapper<LongWritable, Text, Text, Text> {
private final static String b_label = "b_";
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splits = line.split("\\|");
String newValue = "";
// 过滤不符合数据
if (splits.length == 10) {
// 构造城市名称,国家,省份,县市,地区以逗号隔开
if (splits[4] != null)
newValue += splits[4];
if (splits[3] != null)
newValue += "," + splits[3];
if (splits[5] != null)
newValue += "," + splits[5];
if (splits[2] != null)
newValue += "," + splits[2];
context.write(new Text(splits[0]), new Text(b_label + newValue));
}
}
}
- 在Reduce中区分,并连接,用startsWith(“a_”)方法判断,如果是以“a_”开头加入集合vecA,如果是以“b_”开头加入vecB,对了我写的字符串拼接有点孬了,最好使用stringbuild来写,不然会产生垃圾,反正是要序列化的,如果是放在集合等其他容器里,要注意会有坑。
public static class doReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String s1 = null;
String s2 = null;
// 创建两个集合
Vector<String> vecA = new Vector<String>();
Vector<String> vecB = new Vector<String>();
// 判断两个输入数据的value,分别加入上面两个集合
for (Text value : values) {
String line = value.toString().trim();
// 判断开头标记
if (line.startsWith("a_")) {
String[] v = line.substring(2).split(" ");
// 去掉cityid字段
s1 = v[0] + " " + v[1] + " " + v[2] + " " + v[3];
vecA.add(s1);
}
if (line.startsWith("b_")) {
s2 = line.substring(2);
vecB.add(s2);
}
}
// 将两个集合 拼接,笛卡尔积
for (String v1 : vecA) {
for (String v2 : vecB) {
context.write(new Text(v1 + " " + "cityid:" + v2), new Text(""));
}
}
}
}
}
- 最后注意主类的写法MultipleInputs.addInputPath()方法
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "test3");
job.setJarByClass(test3.class);
FileSystem fs = FileSystem.get(conf);
Path outputDir = new Path("/output/bigdata/test3");
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);// true表示递归删除
}
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputDir);
// 第一个文件来自test2的输出结果
MultipleInputs.addInputPath(job, new Path("/output/bigdata/test2/"), TextInputFormat.class, doMapper1.class);
// 第二个文件来自 cityid.txt
MultipleInputs.addInputPath(job, new Path("/input/bigdata/cityid"), TextInputFormat.class, doMapper2.class);
job.setReducerClass(doReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
结果如下
分片的方法Mapper端通过一下两行代码:
拿到路径的字符串,通过该字符串不同打上不同标签
// 获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取路径信息
String path = inputSplit.getPath().toString();
相应主类写法变动加入以下两行代码:
// 输入两个路径
FileInputFormat.addInputPath(job, new Path("/input/dianxin/tl_hefei/dianxin_data"));
FileInputFormat.addInputPath(job, new Path("/input/dianxin/city_id/city_id.txt"));
麋鹿的调包霞
发布了1 篇原创文章 · 获赞 0 · 访问量 39
私信
关注
标签:MapperReduce,splits,华东地区,Text,String,文件夹,new,class,读取数据 来源: https://blog.csdn.net/c_1234qwer/article/details/104146281