其他分享
首页 > 其他分享> > Hadoop1.*版本 统计文件中字符串出现的数量 或收集 《未完待续》

Hadoop1.*版本 统计文件中字符串出现的数量 或收集 《未完待续》

作者:互联网

入门级项目,实践一下,分析并统计服务器运行日志中调用量最多的SQL语句,把它进行缓存

pom.xml 引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>test.hadoop</groupId>
	<artifactId>WordCount</artifactId>
	<version>0.0.1-acute</version>
	<packaging>jar</packaging>

	<name>WordCount</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>0.23.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>0.23.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>0.23.11</version>
		</dependency>
	</dependencies>
</project>

总涉及3个类,一个是程序启动类及两个执行不同统计的功能类

package test.hadoop.line;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class LineMatcherStarter {

	public static void main(String[] args) throws IOException {
        // 根据参数调用不同功能
		int key = -1;
		try {
			key = Integer.valueOf(args[0]);
		} catch (NumberFormatException e) {
			e.printStackTrace();
			key = 0;
			return;
		}
		switch (key) {
		case 1:
			countJob(key, args); // 计数任务
			break;
		case 2:
			collectJob(key, args); // 收集任务
			break;
		default:
			printUsage();
		}
	}

	private static void printUsage() {
		System.out.println("Usage: java [-options] -jar jarfile class [args...]");
		System.out.println("  class a.b.c.Starter");
		System.out.println("  args[0] 1=count 2=line");
		System.out.println("  args[1] source");
		System.out.println("  args[2] destination");
	}

	private static void collectJob(int key, String[] args) throws IOException {
		if (args.length < 4) {
			printUsage();
			System.out.println("  args[3] expression");
			System.out.println("  args[4] rule=[starts|contains|ends]");
			System.out.println("  args[5] max line  default=9999");
			return;
		}
        // Hadoop任务的初始化操作,版本不同写法不同
		JobConf conf = new JobConf(LineMatcherStarter.class);
		conf.setJobName("LineCollect");
		conf.setMapperClass(TextWithLineMapperReducer.class);
		conf.setCombinerClass(TextWithLineReducer.class);
		conf.setReducerClass(TextWithLineReducer.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(LineArrayWritable.class);
		conf.setOutputFormat(TextWithLineOutputFormat.class);
        // 指定文件输入路径 和 输出路径
		FileInputFormat.setInputPaths(conf, args[1]);
		FileOutputFormat.setOutputPath(conf, new Path(args[2]));	
        // 自定义属性,用于搜索的 字符串 和 匹配规则(开头,包含,结尾)
		conf.set("TEXTWITHLINE.search", args[3]);
		conf.set("TEXTWITHLINE.rule", args[4]);	
		if (args.length == 6) {
            // 每个任务在分布式机器上的最大统计行数
            // 根据内存估算,不然有可能会引发OOM异常,别问我是怎么知道的
			conf.set("TEXTWITHLINE.maxLine", args[5]);	
		}
        // 执行任务
		JobClient.runJob(conf);
	}

	private static void countJob(int key, String[] args) throws IOException {
		if (args.length < 4) {
			printUsage();
			System.out.println("  args[3] expression");
			System.out.println("  args[4] rule=[starts|contains|ends]");
			return;
		}
		JobConf conf = new JobConf(LineMatcherStarter.class);
		conf.setJobName("LineCount");
		conf.setMapperClass(LineCountMapperReducer.class);
		conf.setCombinerClass(LineCountReducer.class);
		conf.setReducerClass(LineCountReducer.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		FileInputFormat.setInputPaths(conf, args[1]);
		FileOutputFormat.setOutputPath(conf, new Path(args[2]));	
		conf.set("TEXTWITHLINE.search", args[3]);
		conf.set("TEXTWITHLINE.rule", args[4]);	
		JobClient.runJob(conf);
	}

}
package test.hadoop.line;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class TextWithLineMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,LineArrayWritable>{
	private Text keyText;
	private String search;
	public TextWithLineMapperReducer() throws FileNotFoundException, IOException {
	}
	public void configure(JobConf job) {
		search = job.get("TEXTWITHLINE.search");
	}
	public void map(LongWritable k,Text v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
		if (search == null || keyText == null) {
			keyText = new Text(search);
			if (search.contentEquals("") || keyText == null) {
				throw new RuntimeException("Search is empty!");
			}
		}
		String line = v.toString();
		if (line.indexOf(search) >= 0) {
			o.collect(keyText, new LineArrayWritable(new Text[]{v}));
		}
	}
}

class TextWithLineReducer extends MapReduceBase implements Reducer<Text,LineArrayWritable,Text,LineArrayWritable>{
	private int max = Integer.MAX_VALUE;
	public void configure(JobConf job) {
		max = Integer.valueOf(job.get("TEXTWITHLINE.maxLine", "9999"));
	}
	public void reduce(Text k,Iterator<LineArrayWritable> v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
		List<Text> list = new ArrayList<>();
		int i = 0;
		while (v.hasNext()) {
			String[] ss = v.next().toStrings();
			for (String s : ss) {
				if (i++ < max)
					list.add(new Text(s));
			}
		}
		o.collect(k, new LineArrayWritable(list.toArray(new Text[0])));
	}
}

class TextWithLineWriter implements RecordWriter<Text, LineArrayWritable> {
	private static final byte[] newline = getBytes("\r\n");
    static {
      
    }
    private static byte[] getBytes(String s) {
    	try {
            return s.getBytes("UTF-8");
    	} catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + "UTF-8" + " encoding");
        }
    }
    protected DataOutputStream out;
    public TextWithLineWriter(DataOutputStream s) {
    	out = s;
    }
	public synchronized void write(Text key, LineArrayWritable value) throws IOException {
		out.write(getBytes("----->" + key.toString()));
        out.write(newline);
        writeArray(value);
        out.write(newline);
	}
    private void writeArray(LineArrayWritable aw) throws IOException {
    	int i = 0;
    	for (String s : aw.toStrings()) {
    		out.write(getBytes("-->" + (i++) + "->" + s));
            out.write(newline);
    	}
    }
	public void close(Reporter reporter) throws IOException {
		out.close();
	}
}

class TextWithLineOutputFormat extends TextOutputFormat<Text,LineArrayWritable>{
	public RecordWriter<Text,LineArrayWritable> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable p)throws IOException{
		boolean isCompressed = getCompressOutput(job);
	    if (!isCompressed) {
	      Path file = FileOutputFormat.getTaskOutputPath(job, name);
	      FileSystem fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, p);
	      return new TextWithLineWriter(fileOut);
	    } else {
	      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
	      // create the named codec
	      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
	      // build the filename including the extension
	      Path file = FileOutputFormat.getTaskOutputPath(job,name+codec.getDefaultExtension());
	      FileSystem fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, p);
	      return new TextWithLineWriter(fileOut);
	    }
	}
}

class LineArrayWritable extends ArrayWritable {
	public LineArrayWritable() {
		super(Text.class);
	}
	public LineArrayWritable(Text[] array) {
		super(Text.class);
		Text[] texts = new Text[array.length];
		for (int i = 0; i < array.length; ++i) {
			texts[i] = new Text(array[i]);
		}
		set(texts);
	}
}
package test.hadoop.line;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Predicate;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class LineCountMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
	private static final IntWritable ONE = new IntWritable(1);
	private String search;
	private Text key;
	private Predicate<String> rule;
	private Predicate<String> starts = s -> s.startsWith(search);
	private Predicate<String> contains = s -> s.contains(search);
	private Predicate<String> ends = s -> s.endsWith(search);
	public void configure(JobConf job) {
		search = job.get("TEXTWITHLINE.search");
		key = new Text(search);
		switch (job.get("TEXTWITHLINE.rule")) {
			case "starts":
				rule = starts;
				break;
			case "ends":
				rule = ends;
				break;
			case "contains":
			default:
				rule = contains;
		}
	}
	public void map(LongWritable k,Text v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
		String line = v.toString();
		if (rule.test(line)) {
			o.collect(key, ONE);
		}
	}
}

class LineCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text k,Iterator<IntWritable> v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
		int sum = 0;
		while (v.hasNext()) {
			sum += v.next().get();
		}
		o.collect(k, new IntWritable(sum));
	}
}

本人使用了虚拟机,安装,克隆,修改主机名和用户,参考准备工作的文章 Ubuntu 14.04 上实现 更改用户名 用户组 域名 主机名  和  Ubuntu 14.04 上实现 SSH 无密码访问

《未完待续》

标签:org,args,hadoop,未完待续,apache,conf,Hadoop1,字符串,import
来源: https://blog.csdn.net/u011225581/article/details/120526199