Hadoop1.*版本 统计文件中字符串出现的数量 或收集 《未完待续》
作者:互联网
入门级项目,实践一下,分析并统计服务器运行日志中调用量最多的SQL语句,把它进行缓存
pom.xml 引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.hadoop</groupId>
<artifactId>WordCount</artifactId>
<version>0.0.1-acute</version>
<packaging>jar</packaging>
<name>WordCount</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>0.23.11</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>0.23.11</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>0.23.11</version>
</dependency>
</dependencies>
</project>
总涉及3个类,一个是程序启动类及两个执行不同统计的功能类
package test.hadoop.line;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class LineMatcherStarter {
public static void main(String[] args) throws IOException {
// 根据参数调用不同功能
int key = -1;
try {
key = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
key = 0;
return;
}
switch (key) {
case 1:
countJob(key, args); // 计数任务
break;
case 2:
collectJob(key, args); // 收集任务
break;
default:
printUsage();
}
}
private static void printUsage() {
System.out.println("Usage: java [-options] -jar jarfile class [args...]");
System.out.println(" class a.b.c.Starter");
System.out.println(" args[0] 1=count 2=line");
System.out.println(" args[1] source");
System.out.println(" args[2] destination");
}
private static void collectJob(int key, String[] args) throws IOException {
if (args.length < 4) {
printUsage();
System.out.println(" args[3] expression");
System.out.println(" args[4] rule=[starts|contains|ends]");
System.out.println(" args[5] max line default=9999");
return;
}
// Hadoop任务的初始化操作,版本不同写法不同
JobConf conf = new JobConf(LineMatcherStarter.class);
conf.setJobName("LineCollect");
conf.setMapperClass(TextWithLineMapperReducer.class);
conf.setCombinerClass(TextWithLineReducer.class);
conf.setReducerClass(TextWithLineReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LineArrayWritable.class);
conf.setOutputFormat(TextWithLineOutputFormat.class);
// 指定文件输入路径 和 输出路径
FileInputFormat.setInputPaths(conf, args[1]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
// 自定义属性,用于搜索的 字符串 和 匹配规则(开头,包含,结尾)
conf.set("TEXTWITHLINE.search", args[3]);
conf.set("TEXTWITHLINE.rule", args[4]);
if (args.length == 6) {
// 每个任务在分布式机器上的最大统计行数
// 根据内存估算,不然有可能会引发OOM异常,别问我是怎么知道的
conf.set("TEXTWITHLINE.maxLine", args[5]);
}
// 执行任务
JobClient.runJob(conf);
}
private static void countJob(int key, String[] args) throws IOException {
if (args.length < 4) {
printUsage();
System.out.println(" args[3] expression");
System.out.println(" args[4] rule=[starts|contains|ends]");
return;
}
JobConf conf = new JobConf(LineMatcherStarter.class);
conf.setJobName("LineCount");
conf.setMapperClass(LineCountMapperReducer.class);
conf.setCombinerClass(LineCountReducer.class);
conf.setReducerClass(LineCountReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(conf, args[1]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
conf.set("TEXTWITHLINE.search", args[3]);
conf.set("TEXTWITHLINE.rule", args[4]);
JobClient.runJob(conf);
}
}
package test.hadoop.line;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class TextWithLineMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,LineArrayWritable>{
private Text keyText;
private String search;
public TextWithLineMapperReducer() throws FileNotFoundException, IOException {
}
public void configure(JobConf job) {
search = job.get("TEXTWITHLINE.search");
}
public void map(LongWritable k,Text v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
if (search == null || keyText == null) {
keyText = new Text(search);
if (search.contentEquals("") || keyText == null) {
throw new RuntimeException("Search is empty!");
}
}
String line = v.toString();
if (line.indexOf(search) >= 0) {
o.collect(keyText, new LineArrayWritable(new Text[]{v}));
}
}
}
class TextWithLineReducer extends MapReduceBase implements Reducer<Text,LineArrayWritable,Text,LineArrayWritable>{
private int max = Integer.MAX_VALUE;
public void configure(JobConf job) {
max = Integer.valueOf(job.get("TEXTWITHLINE.maxLine", "9999"));
}
public void reduce(Text k,Iterator<LineArrayWritable> v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
List<Text> list = new ArrayList<>();
int i = 0;
while (v.hasNext()) {
String[] ss = v.next().toStrings();
for (String s : ss) {
if (i++ < max)
list.add(new Text(s));
}
}
o.collect(k, new LineArrayWritable(list.toArray(new Text[0])));
}
}
class TextWithLineWriter implements RecordWriter<Text, LineArrayWritable> {
private static final byte[] newline = getBytes("\r\n");
static {
}
private static byte[] getBytes(String s) {
try {
return s.getBytes("UTF-8");
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + "UTF-8" + " encoding");
}
}
protected DataOutputStream out;
public TextWithLineWriter(DataOutputStream s) {
out = s;
}
public synchronized void write(Text key, LineArrayWritable value) throws IOException {
out.write(getBytes("----->" + key.toString()));
out.write(newline);
writeArray(value);
out.write(newline);
}
private void writeArray(LineArrayWritable aw) throws IOException {
int i = 0;
for (String s : aw.toStrings()) {
out.write(getBytes("-->" + (i++) + "->" + s));
out.write(newline);
}
}
public void close(Reporter reporter) throws IOException {
out.close();
}
}
class TextWithLineOutputFormat extends TextOutputFormat<Text,LineArrayWritable>{
public RecordWriter<Text,LineArrayWritable> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable p)throws IOException{
boolean isCompressed = getCompressOutput(job);
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, p);
return new TextWithLineWriter(fileOut);
} else {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file = FileOutputFormat.getTaskOutputPath(job,name+codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, p);
return new TextWithLineWriter(fileOut);
}
}
}
class LineArrayWritable extends ArrayWritable {
public LineArrayWritable() {
super(Text.class);
}
public LineArrayWritable(Text[] array) {
super(Text.class);
Text[] texts = new Text[array.length];
for (int i = 0; i < array.length; ++i) {
texts[i] = new Text(array[i]);
}
set(texts);
}
}
package test.hadoop.line;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Predicate;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class LineCountMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
private static final IntWritable ONE = new IntWritable(1);
private String search;
private Text key;
private Predicate<String> rule;
private Predicate<String> starts = s -> s.startsWith(search);
private Predicate<String> contains = s -> s.contains(search);
private Predicate<String> ends = s -> s.endsWith(search);
public void configure(JobConf job) {
search = job.get("TEXTWITHLINE.search");
key = new Text(search);
switch (job.get("TEXTWITHLINE.rule")) {
case "starts":
rule = starts;
break;
case "ends":
rule = ends;
break;
case "contains":
default:
rule = contains;
}
}
public void map(LongWritable k,Text v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
String line = v.toString();
if (rule.test(line)) {
o.collect(key, ONE);
}
}
}
class LineCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text k,Iterator<IntWritable> v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
int sum = 0;
while (v.hasNext()) {
sum += v.next().get();
}
o.collect(k, new IntWritable(sum));
}
}
本人使用了虚拟机,安装,克隆,修改主机名和用户,参考准备工作的文章 Ubuntu 14.04 上实现 更改用户名 用户组 域名 主机名 和 Ubuntu 14.04 上实现 SSH 无密码访问
《未完待续》
标签:org,args,hadoop,未完待续,apache,conf,Hadoop1,字符串,import 来源: https://blog.csdn.net/u011225581/article/details/120526199