MapReduce之天气案例(按月分区,并求出每月的最高温度)
作者:互联网
今天老师给我们布置了一个作业,就是上面这个文档,将上面的内容按月分区,并输出每月的最高温度,来吧宝贝们。
拿到这个文档,首先我们一眼就可以看出要分成3个字段吧,其中温度我们要进行数值比较,所以在设置属性的基本数据类型的时候就应该把它设置为int型,然后由于我们还要按月进行分区,因为我们需要把第一个字段中的月份取出来,那我们也应该设置一个月份的属性吧,这里我们既有5个属性了,接下来直接看每个部分的代码吧
package test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Tem implements Writable{
//因为后面要用month进行分区,所以这里单独把月份拿出来了
private String month;
private String date;
private String time;
//因为后面要用温度进行比较大小。因此在这设置为int型方便比较
private int cc;
@Override
//规定输出的格式
public String toString() {
return "日期:"+date+" 时间:"+time+" 摄氏度:"+cc+"c";
}
//序列化输入
public void readFields(DataInput input) throws IOException{
this.month=input.readUTF();
this.date=input.readUTF();
this.time=input.readUTF();
this.cc=input.readInt();
}
//序列化输出
public void write(DataOutput output) throws IOException{
output.writeUTF(this.month);
output.writeUTF(this.date);
output.writeUTF(this.time);
output.writeInt(this.cc);
}
public String getMonth() {
return month ;
}
public void setMonth(String month) {
this.month=month;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public int getCc() {
return cc;
}
public void setCc(int cc) {
this.cc = cc;
}
}
package test;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TemMapper extends Mapper<LongWritable,Text,Text,Tem> {
@Override
protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Tem>.Context context)
throws IOException,InterruptedException{
String data=value.toString();
String[] words =data.split(",");
Tem t=new Tem();
//为了避免有的列中部分属性为空值,或者文本输入最后有换行符,也会被当成是一行数据,造成后面的22-24行的程序报错,这里一定要确认words的长度是3
if(words.length==3) {
//前面也讲过了,这种文本数据在不同的电脑版本上会有一些格式上的混乱,为了避免一些不必要的错误,最好使用trim()函数去掉字符串前后的空格
t.setDate(words[0].trim());
t.setTime(words[1].trim());
//这里是为了取出温度中的数字,那我采取的办法是利用repalce()函数先将“c”替换成“”空字符,最后去掉前后的空字符即可
t.setCc(Integer.parseInt(words[2].replace("c", "").trim()));
//将取出的第一个字符串按照“-”再进行分隔,取出其中的月份
t.setMonth(words[0].split("-")[1].trim());
}
context.write(new Text(t.getMonth()),t);
}
}
package test;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class TemPartition extends Partitioner<Text, Tem> {
@Override
public int getPartition(Text k3, Tem t, int num) {
return Integer.parseInt(t.getMonth())-1;
}}
package test;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TemReducer extends Reducer <Text,Tem,Text,Tem> {
@Override
protected void reduce(Text k3,Iterable<Tem> v3,Reducer<Text,Tem,Text,Tem>.Context context)
throws IOException,InterruptedException{
int max=0;
for(Tem t:v3) {
//我们知道,在mapreduce的时候,shuffle会将键相同的值进行合并,所以在这里当reduce对每一组值进行循环的时候,我们就比较方面的找到每个月的最高温度
if (max<t.getCc()){
max=t.getCc();
}
context.write(null,t);
}
//这里也要注意,write的两个参数也是不能随便放的,因为我们在第9行中的后两个输出值做了规定的,一个为text,一个为tem,既然第20行我们决定把t放在后面一位,那这里的25行必然要把输出的内容放在前面
context.write(new Text("当月最高气温:"+max), null);
}}
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TemMian {
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(TemMian.class);
job.setMapperClass(TemMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Tem.class);
job.setPartitionerClass(TemPartition.class);
job.setNumReduceTasks(12);
job.setReducerClass(TemReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Tem.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
标签:并求,分区,MapReduce,hadoop,io,org,apache,import,public 来源: https://blog.csdn.net/m0_48712142/article/details/117731093