Hadoop-day01_(java代码模拟hadoop存储数据)
作者:互联网
hadoop文件切分思想
需求:统计文本文件中的各个班级的人数(一共多到数不清的人)
1500100129,容寄南,23,女,文科三班
1500100130,宁怀莲,21,女,理科四班
1500100131,胡昊明,22,男,文科六班
1500100132,曾安寒,22,女,文科五班
1500100133,钱向山,24,女,理科二班
1500100134,计宣朗,22,男,理科四班
1500100135,庾振海,21,男,理科四班
1500100136,黎昆鹏,22,男,文科六班
1500100137,宣向山,22,女,理科四班
1500100138,栾鸿信,22,男,文科二班
1500100139,左代萱,24,女,文科三班
1500100140,郁运发,24,男,文科六班
1500100141,谢昌勋,23,男,理科六班
...
...
用 java 简单模拟hadoop 进行统计
传统方法:利用集合统计每个班级的人数
package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author WangTao
* @date 2022/5/20 16:40
*/
public class Student_Count_Demo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
//创建一个 map 集合,接受总的结果数据
HashMap<String, Integer> map = new HashMap<>();
BufferedReader br = null;
BufferedWriter bw = null;
try {
//读取文件进行分割,
//创建字符缓冲输入流对
br = new BufferedReader(new FileReader("src/day01/student"));
String len = null;
while((len = br.readLine()) != null){
//以逗号进行分割,得到班级和人数
String[] split = len.split("[,]");
String clazz = split[4];
//判断 map 集合是否存在对应的班级,不存在的话就添加
if(!map.containsKey(clazz)){
map.put(clazz,1);
}else{
//存在的话,就在原本的值上加 1
map.put(clazz,map.get(clazz)+1);
}
}
//将结果集写道最终的文件中
bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
//遍历集合
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
Integer value = entry.getValue();
String key = entry.getKey();
bw.write(key+":"+value);
bw.newLine();
bw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//释放资源
if (bw != null){
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(br!=null){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
问题:
- 读取效率低,低效率和昂贵的服务器
- 存在数据安全问题
- 等等
Hadoop方法
分布式思想:
将数据分成多个block,(这里每一行数据等于 1 mb 大小的数据)
分布式存储(HDFS),在给每个模块进行分配任务(MAP即给每个模块一个线程),
每个模块分别进行计算,然后将各个模块计算的结果聚合(redus)
-
1. 步骤一:分块:每块大小为128兆,但是进行切分时,每快的大小为128*1.1时(约等于140多兆),才进行切分 128兆 的大小,最后一块,
如果和倒数第二块的大小没超过 128*1.1 的大小,就只会分配一个map资源,进行计算
package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author WangTao
* @date 2022/5/20 16:40
*/
public class Student_Count_Demo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
//创建一个 map 集合,接受总的结果数据
HashMap<String, Integer> map = new HashMap<>();
BufferedReader br = null;
BufferedWriter bw = null;
try {
//读取文件进行分割,
//创建字符缓冲输入流对
br = new BufferedReader(new FileReader("src/day01/student"));
String len = null;
while((len = br.readLine()) != null){
//以逗号进行分割,得到班级和人数
String[] split = len.split("[,]");
String clazz = split[4];
//判断 map 集合是否存在对应的班级,不存在的话就添加
if(!map.containsKey(clazz)){
map.put(clazz,1);
}else{
//存在的话,就在原本的值上加 1
map.put(clazz,map.get(clazz)+1);
}
}
//将结果集写道最终的文件中
bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
//遍历集合
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
Integer value = entry.getValue();
String key = entry.getKey();
bw.write(key+":"+value);
bw.newLine();
bw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//释放资源
if (bw != null){
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(br!=null){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2 .应为分了 8个模块,这里创建8 个线程
package day02;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author WangTao
* @date 2022/5/20 20:58
*/
/*
Map (通过线程池的方式,简单来说模拟Hadoop中一个block块生成一个map任务,一个map相当于一个线程
在切分出来的的block块中,统计每个班级的人数)
*/
public class Map {
public static void main(String[] args) {
//创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(8);
//定义文件编号,从 0 开始
int offset = 0;
File file = new File("src/day02/blocks");
File[] files = file.listFiles();
for (File file1 : files) {
MapTask mapTask = new MapTask(file1, offset);
executorService.submit(mapTask);
offset++;
}
System.out.println("分布式求取班级人数完成!!!");
//关闭线程池
executorService.shutdown();
}
}
3 . 线程池中跑如下 map 任务:计算每个模块的人数
package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author WangTao
* @date 2022/5/20 21:07
*/
public class MapTask implements Runnable{
private File file;
public int offset;
public MapTask(File file,int offset) {
this.file = file;
this.offset = offset;
}
@Override
public void run() {
BufferedReader br = null;
BufferedWriter bw = null;
try {
//字符缓冲输入流
br = new BufferedReader(new FileReader(file));
//创建一个HashMap集合,来存储学生对象
HashMap<String, Integer> map = new HashMap<>();
String lin = null;
while((lin = br.readLine()) != null) {
//用逗号进行分割
String clazz = lin.split("[,]")[4];
//如果在map中没有班级作为key,那我们把班级作为key,value 为 1
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
//将局部文件 ,集合中的数据写入到文本文件中
//创建字符缓冲输出流
bw = new BufferedWriter(new FileWriter("src/day02/block_counts/block---"+offset));
//遍历集合
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
Integer value = entry.getValue();
String key = entry.getKey();
bw.write(key+":"+value);
bw.newLine();
bw.flush();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(br!= null){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4 . 最后将8个线程跑出来的任务进行聚合:统计出最终人数
package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author WangTao
* @date 2022/5/20 21:51
*/
/*
将每个map任务的结果,在做一次聚合,统计出最终的人数
*/
public class Reduce {
public static void main(String[] args)throws Exception {
//将 past 目录封装成 File 对象
File file = new File("src/day02/block_counts");
//获取下面所哟文件的对象数组
File[] files = file.listFiles();
//创建一个 map 集合,接受总的数据
HashMap<String, Integer> map = new HashMap<>();
//遍历每一个block_counts 对象
for (File file1 : files) {
//读取文件,进行分割
//创建字符缓冲输入流对象
BufferedReader br = new BufferedReader(new FileReader(file1));
String len = null;
while ((len = br.readLine()) != null) {
//读取的数据以 : 进行分割,得到键和值
String[] split = len.split("[:]");
String clazz = split[0];
//将字符串类型转换成 int 类型,方便进行计算
Integer sum = Integer.valueOf(split[1]);
//判断 map 中是否存在对应的 key
if(!map.containsKey(clazz)){
map.put(clazz,sum);
}else{
//如果存在,value 值相加
map.put(clazz, map.get(clazz)+sum);
}
}
//关闭读取文件通道
br.close();
}
//读取文件已经完成,现在开始写入到最终文件中
BufferedWriter bw = new BufferedWriter(new FileWriter("src/day02/finally_count/finally_sum"));
//遍历 map 集合,将数据写入到文件中去
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key+":"+value);
bw.newLine();
bw.flush();
}
//关闭资源
bw.close();
}
}
分布式计算的好处:
1)高可靠性:因为Hadoop假设计算元素和存储会出现故障,因为它维护多个工作数据副本,在出现故障时可以对失败的节点重新分布处理。
2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。
3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。
4)高容错性:自动保存多份副本数据,并且能够自动将失败的任务重新分配。
5)成本低(Economical):Hadoop通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。
标签:map,null,java,Hadoop,hadoop,bw,new,clazz 来源: https://www.cnblogs.com/atao-BigData/p/16294070.html