编程语言
首页 > 编程语言> > Hadoop-day01_(java代码模拟hadoop存储数据)

Hadoop-day01_(java代码模拟hadoop存储数据)

作者:互联网

hadoop文件切分思想

需求:统计文本文件中的各个班级的人数(一共多到数不清的人)

1500100129,容寄南,23,女,文科三班
1500100130,宁怀莲,21,女,理科四班
1500100131,胡昊明,22,男,文科六班
1500100132,曾安寒,22,女,文科五班
1500100133,钱向山,24,女,理科二班
1500100134,计宣朗,22,男,理科四班
1500100135,庾振海,21,男,理科四班
1500100136,黎昆鹏,22,男,文科六班
1500100137,宣向山,22,女,理科四班
1500100138,栾鸿信,22,男,文科二班
1500100139,左代萱,24,女,文科三班
1500100140,郁运发,24,男,文科六班
1500100141,谢昌勋,23,男,理科六班
...
...

用 java 简单模拟hadoop 进行统计

传统方法:利用集合统计每个班级的人数

package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 16:40
 */
public class Student_Count_Demo {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //创建一个 map 集合,接受总的结果数据
        HashMap<String, Integer> map = new HashMap<>();
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //读取文件进行分割,
            //创建字符缓冲输入流对
            br = new BufferedReader(new FileReader("src/day01/student"));
            String len = null;
            while((len = br.readLine()) != null){
                //以逗号进行分割,得到班级和人数
                String[] split = len.split("[,]");
                String clazz = split[4];
                //判断 map 集合是否存在对应的班级,不存在的话就添加
                if(!map.containsKey(clazz)){
                    map.put(clazz,1);
                }else{
                    //存在的话,就在原本的值上加 1
                    map.put(clazz,map.get(clazz)+1);
                }
            }
            //将结果集写道最终的文件中
            bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String,  Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            if (bw != null){
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!=null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

问题:

  • 读取效率低,低效率和昂贵的服务器
  • 存在数据安全问题
  • 等等

Hadoop方法

分布式思想:

将数据分成多个block,(这里每一行数据等于 1 mb 大小的数据)

分布式存储(HDFS),在给每个模块进行分配任务(MAP即给每个模块一个线程),

每个模块分别进行计算,然后将各个模块计算的结果聚合(redus

  1. 1. 步骤一:分块:每块大小为128兆,但是进行切分时,每快的大小为128*1.1时(约等于140多兆),才进行切分 128兆 的大小,最后一块,

    如果和倒数第二块的大小没超过 128*1.1 的大小,就只会分配一个map资源,进行计算

package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 16:40
 */
public class Student_Count_Demo {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //创建一个 map 集合,接受总的结果数据
        HashMap<String, Integer> map = new HashMap<>();
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //读取文件进行分割,
            //创建字符缓冲输入流对
            br = new BufferedReader(new FileReader("src/day01/student"));
            String len = null;
            while((len = br.readLine()) != null){
                //以逗号进行分割,得到班级和人数
                String[] split = len.split("[,]");
                String clazz = split[4];
                //判断 map 集合是否存在对应的班级,不存在的话就添加
                if(!map.containsKey(clazz)){
                    map.put(clazz,1);
                }else{
                    //存在的话,就在原本的值上加 1
                    map.put(clazz,map.get(clazz)+1);
                }
            }
            //将结果集写道最终的文件中
            bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String,  Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            if (bw != null){
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!=null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

2 .应为分了 8个模块,这里创建8 个线程

package day02;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author WangTao
 * @date 2022/5/20 20:58
 */
/*
    Map (通过线程池的方式,简单来说模拟Hadoop中一个block块生成一个map任务,一个map相当于一个线程
    在切分出来的的block块中,统计每个班级的人数)
 */
public class Map {
    public static void main(String[] args) {
        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        //定义文件编号,从 0 开始
        int offset = 0;
        File file = new File("src/day02/blocks");
        File[] files = file.listFiles();
        for (File file1 : files) {
            MapTask mapTask = new MapTask(file1, offset);
            executorService.submit(mapTask);
            offset++;
        }
        System.out.println("分布式求取班级人数完成!!!");
        //关闭线程池
        executorService.shutdown();
        


    }
}

3 . 线程池中跑如下 map 任务:计算每个模块的人数

package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 21:07
 */
public class MapTask implements Runnable{
    private File file;
    public int offset;

    public MapTask(File file,int offset) {
        this.file = file;
        this.offset = offset;
    }

    @Override
    public void run() {
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //字符缓冲输入流
            br = new BufferedReader(new FileReader(file));
            //创建一个HashMap集合,来存储学生对象
            HashMap<String, Integer> map = new HashMap<>();
            String lin = null;
            while((lin = br.readLine()) != null) {
                //用逗号进行分割
                String clazz = lin.split("[,]")[4];
                //如果在map中没有班级作为key,那我们把班级作为key,value 为 1
                if (!map.containsKey(clazz)) {
                    map.put(clazz, 1);
                } else {
                    map.put(clazz, map.get(clazz) + 1);
                }

            }
            //将局部文件 ,集合中的数据写入到文本文件中
            //创建字符缓冲输出流
            bw = new BufferedWriter(new FileWriter("src/day02/block_counts/block---"+offset));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (bw != null) {
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!= null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

4 . 最后将8个线程跑出来的任务进行聚合统计出最终人数

package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 21:51
 */
/*
        将每个map任务的结果,在做一次聚合,统计出最终的人数
 */
public class Reduce {
    public static void main(String[] args)throws Exception {
        //将 past 目录封装成 File 对象
        File file = new File("src/day02/block_counts");
        //获取下面所哟文件的对象数组
        File[] files = file.listFiles();
        //创建一个 map 集合,接受总的数据
        HashMap<String, Integer> map = new HashMap<>();
        //遍历每一个block_counts 对象
        for (File file1 : files) {
            //读取文件,进行分割
            //创建字符缓冲输入流对象
            BufferedReader br = new BufferedReader(new FileReader(file1));
            String len = null;
            while ((len = br.readLine()) != null) {
                //读取的数据以 : 进行分割,得到键和值
                String[] split = len.split("[:]");
                String clazz = split[0];
                //将字符串类型转换成 int 类型,方便进行计算
                Integer sum = Integer.valueOf(split[1]);
                //判断 map 中是否存在对应的 key
                if(!map.containsKey(clazz)){
                    map.put(clazz,sum);
                }else{
                    //如果存在,value 值相加
                    map.put(clazz, map.get(clazz)+sum);
                }
            }
            //关闭读取文件通道
            br.close();
        }
        //读取文件已经完成,现在开始写入到最终文件中
        BufferedWriter bw = new BufferedWriter(new FileWriter("src/day02/finally_count/finally_sum"));
        //遍历 map 集合,将数据写入到文件中去
        Set<Map.Entry<String, Integer>> entries = map.entrySet();
        for (Map.Entry<String, Integer> entry : entries) {
            String key = entry.getKey();
            Integer value = entry.getValue();
            bw.write(key+":"+value);
            bw.newLine();
            bw.flush();
        }
        //关闭资源
        bw.close();
    }
}

分布式计算的好处:

1)高可靠性:因为Hadoop假设计算元素和存储会出现故障,因为它维护多个工作数据副本,在出现故障时可以对失败的节点重新分布处理。

2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。

3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。

4)高容错性:自动保存多份副本数据,并且能够自动将失败的任务重新分配。

5)成本低(Economical):Hadoop通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。

标签:map,null,java,Hadoop,hadoop,bw,new,clazz
来源: https://www.cnblogs.com/atao-BigData/p/16294070.html