0009.Mapreduce的高级功能
作者:互联网
目录
- 05-12-复习SQL的排序
- 05-13-Java对象的排序
- 05-14-MR对象的排序
- 05-15-分区的基本概念
- 05-16-什么是Hash分区
- 05-17-分区的编程案例
- 05-18-什么是Combiner
- 05-19-不能使用Combiner的情况
05-12-复习SQL的排序
05-13-Java对象的排序
Java对象的排序
//学生对象:按照学生的age年龄进行排序
public class Student implements Comparable<Student>{
private int stuID;
private String stuName;
private int age;
@Override
public String toString() {
return "Student [stuID=" + stuID + ", stuName=" + stuName + ", age=" + age + "]";
}
@Override
public int compareTo(Student o) {
// 定义排序规则:按照学生的age年龄进行排序
if(this.age >= o.getAge()){
return 1;
}else{
return -1;
}
}
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
import java.util.Arrays;
public class StudentMain {
public static void main(String[] args) {
//创建几个学生对象
Student s1 = new Student();
s1.setStuID(1);
s1.setStuName("Tom");
s1.setAge(24);
Student s2 = new Student();
s2.setStuID(2);
s2.setStuName("Mary");
s2.setAge(26);
Student s3 = new Student();
s3.setStuID(3);
s3.setStuName("Mike");
s3.setAge(25);
//生成一个数组
Student[] list = {s1,s2,s3};
//排序
Arrays.sort(list);
//输出
for(Student s:list){
System.out.println(s);
}
}
}
05-14-MR对象的排序
多个列排序
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements WritableComparable<Emp>{
private int empno;//员工号
private String ename; //员工姓名
private String job; //职位
private int mgr; //经理的员工号
private String hiredate;//入职日期
private int sal; //月薪
private int comm; //奖金
private int deptno; //部门号
// @Override
// public int compareTo(Emp o) {
// // 定义自己的排序规则:一个列的排序
// // 按照薪水进行排序
// if(this.sal >= o.getSal()){
// return 1;
// }else{
// return -1;
// }
// }
@Override
public int compareTo(Emp o) {
// 定义自己的排序规则:多个列的排序
// 先按照部门号进行排序,再按照薪水进行排序
if(this.deptno > o.getDeptno()){
return 1;
}else if(this.deptno < o.getDeptno()){
return -1;
}
//再按照薪水进行排序
if(this.sal >= o.getSal()){
return 1;
}else{
return -1;
}
}
@Override
public String toString() {
return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
@Override
public void readFields(DataInput input) throws IOException {
//实现反序列化,从输入流中读取对象
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 实现序列化,把对象输出到输出流
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* 一定要把Emp作为key2
* 没有value2,返回null值
*/
public class EmpSortMapper extends Mapper<LongWritable, Text, Emp, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//生成员工对象
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
//输出员工对象 k2:员工对象 v2:空值
context.write(emp, NullWritable.get());
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmpSortMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmpSortMain.class);
job.setMapperClass(EmpSortMapper.class);
job.setMapOutputKeyClass(Emp.class); //k2 是员工对象
job.setMapOutputValueClass(NullWritable.class); // v2:是空值
job.setOutputKeyClass(Emp.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
05-15-分区的基本概念
什么是分区.png
05-16-什么是Hash分区
05-17-分区的编程案例
自定义的分区规则:按照部门号进行分区
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements Writable{
private int empno;//员工号
private String ename; //员工姓名
private String job; //职位
private int mgr; //经理的员工号
private String hiredate;//入职日期
private int sal; //月薪
private int comm; //奖金
private int deptno; //部门号
@Override
public String toString() {
return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
@Override
public void readFields(DataInput input) throws IOException {
//实现反序列化,从输入流中读取对象
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 实现序列化,把对象输出到输出流
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
//自定义的分区规则:按照部门号进行分区 k2 部门号 v2 员工对象
public class MyPartitioner extends Partitioner<IntWritable, Emp> {
/**
* numTask:分区的个数
*/
@Override
public int getPartition(IntWritable k2, Emp v2, int numTask) {
// 建立我们的分区规则
//得到该员工的部门号
int deptno = v2.getDeptno();
if(deptno == 10){
//放入一号分区
return 1%numTask;
}else if(deptno == 20){
//放入二号分区
return 2%numTask;
}else{
//30号部门,放入零号分区
return 3%numTask;
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
//就是同一个部门的员工
public class MyPartitionerReducer extends Reducer<IntWritable, Emp, IntWritable, Emp> {
@Override
protected void reduce(IntWritable k3, Iterable<Emp> v3,Context context) throws IOException, InterruptedException {
// 直接输出
for(Emp e:v3){
context.write(k3, e);
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2 部门号 v2 员工
public class MyPartitionerMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//生成员工对象
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
//输出员工对象 k2:部门号 v2:员工对象
context.write(new IntWritable(emp.getDeptno()), emp);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyPartitionerMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MyPartitionerMain.class);
job.setMapperClass(MyPartitionerMapper.class);
job.setMapOutputKeyClass(IntWritable.class); //k2 是部门号
job.setMapOutputValueClass(Emp.class); // v2输出就是员工对象
//加入分区规则
job.setPartitionerClass(MyPartitioner.class);
//指定分区的个数
job.setNumReduceTasks(3);
job.setReducerClass(MyPartitionerReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Emp.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
按照部门号进行分区.png
05-18-什么是Combiner
Combiner的作用.png
分析WordCount数据处理的过程.png
05-19-不能使用Combiner的情况
不能使用Combiner.png
标签:return,String,int,高级,Mapreduce,0009,job,import,public 来源: https://www.cnblogs.com/RoyalGuardsTomCat/p/13861663.html