其他分享
首页 > 其他分享> > 《Hadoop实战》之联结不同来源的数据

《Hadoop实战》之联结不同来源的数据

作者:互联网

目录

Reduce端的联结

reduce端联结,又称repartitioned join(重分区联结)或者reparationed sort-merge join(重分区排序-合并联结)

reduce侧联结的主要问题是,混洗阶段消耗过大。

先看几个术语与概念

原理

  1. Map:从不同的数据源载入数据,用组键和标签封装每个记录
  2. shuffle:组键被设置为联结键,按联结键分组、排序
  3. reduce:对每个组内的记录解包,得到原始数据,并进行完全交叉乘积(笛卡尔积),
  4. combine:根据内联结、外联结等方式, 将交叉乘积的结果合并到一条记录中

使用(API已被弃用)

Hadoop的datajoin包实现了联结的数据流,提供了3个可供继承和具体化的抽象类

TaggedMapOutput

datajoin软件包指定了组键为Text类型,而值则为TaggedMapOutput类型,TaggedMapOutput的作用是用Text标签封装记录,它

例:自定义TaggedWritable

// 该子类继承了TaggedMapOutput,可以处理所有Writable类型的记录
public static class TaggedWritable extends TaggedMapOutput {

	private Writable data;
	
	public TaggedWritable(Writable data) {
		this.tag = new Text("");
		this.data = data;
	}
	
	public Writable getData() {
		return data;
	}
	...
}
DataJoinMapperBase

提供了封装的接口,由DataJoinMapperBase的map调用

protected abstract Text generateInputTag(String inputFile);  // 任务开始时被调用,指定全局标签
protected abstract TaggedMapOutput generateTaggedMapOutput(Object value)	//
protected abstract Text generateGroupKey(TaggedMapOutput aRecord)

public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException
{
    TaggedMapOutput aRecord = generateTaggedMapOutput(value);
    Text groupKey = generateGroupKey(aRecord);
    output.collect(groupKey, aRecord);
}

例:实现接口

// 文件名
protected Text generateInputTag(String inputFile) {
	return new Text(inputFile);	
}

// 文件名前缀
protected Text generateInputTag(String inputFile) {
	return new Text(inputFile.split('-')[0]);	
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
	TaggedWritable retv = new TaggedWritable((Text) value);
	retv.setTag(this.inputTag);
	return retv;
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
    String line = ((Text) aRecord.getData()).toString();
    String[] tokens = line.split(",");
   	String groupKey = tokens[0];
    return new Text(groupKey);
}
DataJoinReducerBase

combine()方法位于DataJoinReducerBase,用于筛选掉不需要的组合,并设置合适的输出格式

protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);

基于DistributedCache的复制联结(map端联结)

如果可以保证在联结一条记录是可以访问所有需要的数据,在map端联结即可实现

例子:根据id联结customers和orders

1 Stephanie Leung 555-555-555
2 Edward Kim 123-456-7890
3 Jose Madriz 281-330-8004
4 David Stork 408-555-0000
1 A 12.95 2-Jun-08
2 B 88.25 ######
3 C 32 ######
4 D 25.02 ######
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;


public class MapJoinDemo extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
        private Hashtable<String, String> joinData = new Hashtable<>();

        @Override
        public void configure(JobConf job) {
            try {
                Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);
                if (cacheFiles != null) {
                    String line;
                    String[] tokens;
                    String path = "E" + cacheFiles[0].toString().substring(4);  // 适应Windows单机任务的下下策
                    try (BufferedReader joinReader = new BufferedReader(new FileReader(path))) {
                        while ((line = joinReader.readLine()) != null) {
                            tokens = line.split(",", 2);
                            joinData.put(tokens[0], tokens[1]);
                        }
                    }
                }
            } catch (IOException e) {
                System.err.println("Exception reading DistributedCache: " + e);
            }
        }

        @Override
        public void map(Text key, Text value, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
            String joinValue = joinData.get(key.toString());
            if (joinValue != null) {
                outputCollector.collect(key, new Text(value.toString() + "," + joinValue));
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf job = new JobConf(getConf(), getClass());
        job.setJobName("MapJoinTest");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.set("key.value.separator.in.input.line", ",");
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setMapperClass(MapClass.class);
        // 适应Windows单机任务的下下策
        DistributedCache.addCacheFile(new URI("file:///my_code/bigdata/learn/joinDemo/src/main/resources/customers.csv"), job);

        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new MapJoinDemo(), args);
        System.exit(exitCode);
    }
}

半联结:map侧过滤后在reduce侧联结

若两个表的数据过大无法进行复制联结,但是如果只对某个键进行联结

标签:实战,Text,Hadoop,联结,job,import,new,TaggedMapOutput
来源: https://www.cnblogs.com/vvlj/p/14105466.html