其他分享
首页 > 其他分享> > MapReduce之天气案例(按月分区,并求出每月的最高温度)

MapReduce之天气案例(按月分区,并求出每月的最高温度)

作者:互联网

今天老师给我们布置了一个作业,就是上面这个文档,将上面的内容按月分区,并输出每月的最高温度,来吧宝贝们。

拿到这个文档,首先我们一眼就可以看出要分成3个字段吧,其中温度我们要进行数值比较,所以在设置属性的基本数据类型的时候就应该把它设置为int型,然后由于我们还要按月进行分区,因为我们需要把第一个字段中的月份取出来,那我们也应该设置一个月份的属性吧,这里我们既有5个属性了,接下来直接看每个部分的代码吧

package test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Tem implements Writable{
	//因为后面要用month进行分区,所以这里单独把月份拿出来了
	private String month;
	private String date;
	private String time;
	//因为后面要用温度进行比较大小。因此在这设置为int型方便比较
	private int cc;
	
	@Override
	//规定输出的格式
	public String toString() {
		return "日期:"+date+" 时间:"+time+" 摄氏度:"+cc+"c";
	}
	//序列化输入
	public void readFields(DataInput input) throws IOException{
		this.month=input.readUTF();
		this.date=input.readUTF();
		this.time=input.readUTF();
		this.cc=input.readInt();
	


	}
	//序列化输出
	public void write(DataOutput output) throws IOException{
		output.writeUTF(this.month);
		output.writeUTF(this.date);
		output.writeUTF(this.time);
		output.writeInt(this.cc);

	}
	public String getMonth() {
		return month ;
	}
	public void setMonth(String month) {
		this.month=month;
	}
	public String getDate() {
		return date;
	}
	public void setDate(String date) {
		this.date = date;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public int getCc() {
		return cc;
	}
	public void setCc(int cc) {
		this.cc = cc;
	}
	
	
}



package test;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TemMapper extends Mapper<LongWritable,Text,Text,Tem> {


	@Override
	protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Tem>.Context context) 
	 		throws IOException,InterruptedException{

		String data=value.toString();
		String[] words =data.split(",");
		Tem t=new Tem();
		//为了避免有的列中部分属性为空值,或者文本输入最后有换行符,也会被当成是一行数据,造成后面的22-24行的程序报错,这里一定要确认words的长度是3
		if(words.length==3) {
		//前面也讲过了,这种文本数据在不同的电脑版本上会有一些格式上的混乱,为了避免一些不必要的错误,最好使用trim()函数去掉字符串前后的空格
		t.setDate(words[0].trim());
		t.setTime(words[1].trim());
		//这里是为了取出温度中的数字,那我采取的办法是利用repalce()函数先将“c”替换成“”空字符,最后去掉前后的空字符即可
		t.setCc(Integer.parseInt(words[2].replace("c", "").trim()));
		//将取出的第一个字符串按照“-”再进行分隔,取出其中的月份
		t.setMonth(words[0].split("-")[1].trim());
		}

		
		context.write(new Text(t.getMonth()),t);
	}

}



package test;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TemPartition extends Partitioner<Text, Tem> {
	@Override
	public int getPartition(Text k3, Tem t, int num) {
		return Integer.parseInt(t.getMonth())-1;
	}}
package test;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TemReducer extends Reducer <Text,Tem,Text,Tem> {
	@Override
	protected void reduce(Text k3,Iterable<Tem> v3,Reducer<Text,Tem,Text,Tem>.Context context)
		throws IOException,InterruptedException{
		int max=0;
		for(Tem t:v3) {
		//我们知道,在mapreduce的时候,shuffle会将键相同的值进行合并,所以在这里当reduce对每一组值进行循环的时候,我们就比较方面的找到每个月的最高温度
			if (max<t.getCc()){
				max=t.getCc();
				
			}
			context.write(null,t);
			
		}
		//这里也要注意,write的两个参数也是不能随便放的,因为我们在第9行中的后两个输出值做了规定的,一个为text,一个为tem,既然第20行我们决定把t放在后面一位,那这里的25行必然要把输出的内容放在前面
		
		context.write(new Text("当月最高气温:"+max), null);
}}
package test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TemMian {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Job job=Job.getInstance(new Configuration());
		job.setJarByClass(TemMian.class);
		
		job.setMapperClass(TemMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Tem.class);
		
		job.setPartitionerClass(TemPartition.class);
		job.setNumReduceTasks(12);
		
		job.setReducerClass(TemReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Tem.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
	}

}

 

标签:并求,分区,MapReduce,hadoop,io,org,apache,import,public
来源: https://blog.csdn.net/m0_48712142/article/details/117731093