其他分享
首页 > 其他分享> > Hadoop03---MapReduce基础

Hadoop03---MapReduce基础

作者:互联网

MapReduce基本用法

一 MapReduce基本定义

1.简介

​ MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce (归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

2.核心流程


3.基本模型演示

例题演示:求每部电影最高的三个评分

    static class MovieGradeMap extends Mapper<LongWritable, Text,Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //切割数据
            String s = value.toString();
            String[] split = s.split("\\s+");
            //获取电影名和评分
            String movie = split[1];
            int rate = Integer.parseInt(split[3]);
            context.write(new Text(movie),new IntWritable(rate));
        }
    }
static class MovieGradeReduce extends Reducer<Text,IntWritable,Text,Text> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //创建list集合接收评分
            ArrayList<Integer> list = new ArrayList<>();
            for (IntWritable value : values) {
                list.add(value.get());
            }
            //进行排序
            list.sort((Integer i1,Integer i2)->i2-i1);
            //归纳输出结果
            String s ="";
            if (list.size()>=3)
            {
                s = "该电影最高的三个评分是:"+list.get(0)+"/"+list.get(1)+"/"+list.get(2);
            }else {
                for (Integer in : list) {
                    s+=in+"/";
                }
                s = "该电影最高的三个评分是:"+s;
            }
            context.write(key,new Text(s));
        }
    }
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public static void main(String[] args) throws Exception {
        //获取工作对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, MovieGrade.class.getSimpleName());

        job.setMapperClass(MovieGradeMap.class);
        job.setReducerClass(MovieGradeReduce.class);

  		//设置map的输出格式(如相同,可省略)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
  		//设置总的输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("D://duoyi/xxx/movieInfo.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D://duoyi/xxx/moviesresult"));

        boolean b = job.waitForCompletion(true);
    }

二 MapReduce及其组件

1.独特的Iterable

以下内容为现阶段个人理解,可能存在错误


2.setup方法


3.run方法

//以Reducer中的run方法为例
public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);

    try {
        while(context.nextKey()) {
            this.reduce(context.getCurrentKey(), context.getValues(), context);
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if (iter instanceof ValueIterator) {
                ((ValueIterator)iter).resetBackupStore();
            }
        }
    } finally {
        this.cleanup(context);
    }
}

4.POJO对象


5.MapReduce原理加强

MapReduceYuanLi

5.1 文件读取

5.2 Mapper

5.3 Reducer及输出

三 实践应用

1.求共同好友

package cn.doit.ab.day1118.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class Friends {

  
    public static void main(String[] args) throws Exception{
        runJob1();
        runJob2();
    }


  
  
    static class Friends1Mapper extends Mapper<LongWritable, Text,Text, Text>{

        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String s = value.toString();
            String[] split = s.split(":");
            String own = split[0];
            v.set(own);

            String[] fs = split[1].split(",");
            for (String f : fs) {
                k.set(f);
                context.write(k,v);
                System.out.println(k+"---"+v);
            }
        }
    }

  

    static class Friends1Reducer extends Reducer<Text, Text,Text,Text>{
        Text k = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> list = new ArrayList<>();
            for (Text value : values) {
                String s = value.toString();
                list.add(s);
            }
            if (list.size()>1) {
                for (int i = 0; i < list.size()-1; i++) {
                    for (int i1 = i+1 ; i1 < list.size() ; i1++) {
                        String s = list.get(i)+"和"+list.get(i1)+"的共同好友是:";
                        k.set(s);
                        context.write(k,key);
                    }
                }
            }
        }
    }



    private static void runJob1() throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"Friends1");

        job.setMapperClass(Friends1Mapper.class);
        job.setReducerClass(Friends1Reducer.class);

//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //设置输出文件的编码格式为Sequence
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));

        boolean b = job.waitForCompletion(true);
    }


  
  
  
  
    static class Friends2Mapper extends Mapper<Text, Text,Text, Text>{

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key,value);
        }
    }

  

    static class Friends2Reducer extends Reducer<Text, Text,Text, Text>{
        Text v= new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder sb = new StringBuilder();
            for (Text value : values) {
                sb.append(value+"  ");
            }
            v.set(sb.toString());
            context.write(key,v);
        }
    }



    private static void runJob2() throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf,"Friends2");

        job.setMapperClass(Friends2Mapper.class);
        job.setReducerClass(Friends2Reducer.class);

//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //设置输入文件的编码格式为Sequence
        job.setInputFormatClass(SequenceFileInputFormat.class);

        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\outover"));

        boolean b = job.waitForCompletion(true);
    }
}

2.自定义分区器和分组器

标签:String,Hadoop03,Text,MapReduce,---,job,context,import,class
来源: https://blog.csdn.net/lb634774742/article/details/110245515