MapReduce综合实验---中国大学排名统计
作者:互联网
基于MapReduce的中国大学排名统计
整体思路
① FileInpuFormat读取数据
② Mapper阶段对数据简单处理
③ 序列化实现自定义排序
④ partition分区处理
⑤ Reducer写出数据
⑥ 主类设置
具体实现如下
Driver主类,包括加载jar包路径,设置Mapper、Reducer类,输出类型,partition分区设置,文件输入输出路径等,注意Partition分区时设置的Reduce个数要与分区个数一致,多于或者少于均会报错,导致MapReduce程序停止。
public class RankDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 加载主类
job.setJarByClass(RankDriver.class);
// 设置Mapper和Reducer类
job.setMapperClass(RankMapper.class);
job.setReducerClass(RankReducer.class);
// 设置Mapper数据的数据类型
job.setMapOutputKeyClass(RankBean.class);
job.setMapOutputValueClass(Text.class);
// 设置最终数据的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(RankBean.class);
// 设置Partition分区和分区个数
job.setPartitionerClass(RankPartitioner.class);
job.setNumReduceTasks(6);
// 文件输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\test\\data\\*"));
FileOutputFormat.setOutputPath(job, new Path("E:\\test\\RankTopKOut"));
// 提交job
boolean result = job.waitForCompletion(true);
// 判断结束
System.exit(result ? 0 : 1);
}
}
Bean对象序列化类,注意以下几点
① 实现WritableComparable接口,并传入比较对象,一般来说比较对象为自身。
② 设置空参构造器
③ 重写序列化方法(write and readFields)
④重写compareTo方法,方法体用于实现自定义排序
⑤ 重写toString方法,用于最终的数据写出。
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class RankBean implements WritableComparable<RankBean> {
private String module; // 学校类型
private double score; // 学校评分
private String position; // 学校位置
public RankBean() {
}
public String getModule() {
return module;
}
public void setModule(String module) {
this.module = module;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
public String getPosition() {
return position;
}
public void setPosition(String position) {
this.position = position;
}
@Override
public int compareTo(RankBean o) {
if (this.score > o.score) {
return -1;
}else if (this.score < o.score) {
return 1;
}else {
return 0;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(module);
out.writeDouble(score);
out.writeUTF(position);
}
@Override
public void readFields(DataInput in) throws IOException {
this.module = in.readUTF();
this.score = in.readDouble();
this.position = in.readUTF();
}
@Override
public String toString() {
return module + "\t" + position + "\t" + score ;
}
}
Mapper类实现数据读取,处理,写出操作。在写出操作时,要想实现自定义排序,outKey即写出的key要是一个对象,并且实现序列化,才能实现自定义排序,否则MapReduce底层逻辑会使用快排的方式自动将其输出的key实现排序,例如wordCount程序。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class RankMapper extends Mapper<LongWritable, Text, RankBean, Text> {
private RankBean outK = new RankBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
// 分割获取对应数据
String name = split[0];
String position = split[1];
String mold = split[2];
String score = split[3];
// 存入数据
outV.set(name);
outK.setModule(mold);
outK.setPosition(position);
outK.setScore(Double.parseDouble(score));
// 写出数据
context.write(outK,outV);
}
}
Partition分区类,实现对不同字段的分区合并,最终将数据存储至不同的文件中。具体的实现步骤如下:
① 继承Partitioner类,泛型为Mapper的数据数据类型
② 重写getPartition方法,实现分区
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class RankPartitioner extends Partitioner<RankBean, Text> {
@Override
public int getPartition(RankBean rankBean, Text text, int numPartitions) {
int partition;
if ("北京".equals(rankBean.getPosition())) {
partition = 0;
}else if ("上海".equals(rankBean.getPosition())) {
partition = 1;
}else if ("天津".equals(rankBean.getPosition())) {
partition = 2;
}else if ("江苏".equals(rankBean.getPosition())) {
partition = 3;
}else if ("河南".equals(rankBean.getPosition())) {
partition = 4;
}else {
partition = 5;
}
return partition;
}
}
Reducer类,实现数据的写出操作。
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RankReducer extends Reducer<RankBean, Text, Text, RankBean> {
@Override
protected void reduce(RankBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
至此,整个中国大学的排名,按照关键省份的分区,写出至不同的文件中。最终的输出结果部分如下所示。
中国大学排名数据附后。
数据及源码下载地址:https://pan.baidu.com/s/1rd5_7MwPtDptGm1u3QJnJw
提取码:9q88
希望能够帮助到大家。
标签:String,MapReduce,class,---,job,score,import,中国大学,public 来源: https://blog.csdn.net/weixin_55796089/article/details/121584242