小文件转存SequenceFile
作者:互联网
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileUtil; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.BytesWritable; 5 import org.apache.hadoop.io.SequenceFile; 6 import org.apache.hadoop.io.SequenceFile.Reader; 7 import org.apache.hadoop.io.SequenceFile.Writer; 8 import org.apache.hadoop.io.Text; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 12 import javax.imageio.stream.FileImageOutputStream; 13 import java.io.File; 14 import java.io.FileInputStream; 15 import java.nio.charset.StandardCharsets; 16 import java.util.ArrayList; 17 import java.util.List; 18 19 public class MergeSmallFilesToSequenceFile { 20 private static Logger logger = LoggerFactory.getLogger(MergeSmallFilesToSequenceFile.class); 21 private static Configuration configuration = new Configuration(); 22 private static List<String> smallFilePaths = new ArrayList<String>(); 23 24 /** 25 *添加路径,读取文件夹下所有的文件绝对路径 26 * @param inputPath 27 * @throws Exception 28 */ 29 public void addInputPath(String inputPath) throws Exception{ 30 31 File file = new File(inputPath); 32 33 if(file.isDirectory()){ 34 File[] files = FileUtil.listFiles(file); 35 for(File sFile:files){ 36 smallFilePaths.add(sFile.getPath()); 37 logger.info("添加小文件路径:" + sFile.getPath()); 38 } 39 }else{ 40 smallFilePaths.add(file.getPath()); 41 logger.info("添加小文件路径:" + file.getPath()); 42 } 43 } 44 45 /** 46 *合并小文件序列化存储到hdfs 47 * @throws Exception 48 */ 49 public void mergeFile() throws Exception{ 50 51 Writer.Option bigFile = Writer.file(new Path("/SequenceFile_Test/test/wangxin.test")); 52 53 Writer.Option keyClass = Writer.keyClass(Text.class); 54 Writer.Option valueClass = Writer.valueClass(BytesWritable.class); 55 56 Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass); 57 58 Text key = new Text(); 59 for(String path:smallFilePaths){ 60 File file = new File(path); 61 long fileSize = file.length(); 62 byte[] fileContent = new byte[(int)fileSize]; 63 FileInputStream inputStream = new FileInputStream(file); 64 inputStream.read(fileContent, 0, (int)fileSize); 65 66 //String md5Str = DigestUtils.md5Hex(fileContent); 67 68 //logger.info("merge小文件:"+path+",md5:"+md5Str); 69 key.set(path); 70 71 writer.append(key, new BytesWritable(fileContent)); 72 } 73 writer.hflush(); 74 75 /*for (String path : smallFilePaths) { 76 File file = new File(path); 77 file.deleteOnExit(); 78 }*/ 79 writer.close(); 80 } 81 82 /** 83 * 在hdfs指定位置读取文件,读取k,v 84 * @throws Exception 85 */ 86 public void readMergedFile() throws Exception{ 87 MergeSmallFilesToSequenceFile smf = new MergeSmallFilesToSequenceFile(); 88 configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); 89 Reader.Option file = Reader.file(new Path("hdfs://集群名/hdfs路径")); 90 Reader reader = new Reader(configuration, file); 91 Text key = new Text(); 92 BytesWritable value = new BytesWritable(); 93 while(reader.next(key, value)){ 94 byte[] bytes = value.copyBytes(); 95 //String md5 = DigestUtils.md5Hex(bytes); 96 String content = new String(bytes, StandardCharsets.UTF_8); 97 //logger.info("读取到文件:"+key+",md5:"+md5+",content:"+content); 98 byte[] keyBytes = key.copyBytes(); 99 String keyStr = new String(keyBytes, StandardCharsets.UTF_8); 100 //判断是不是图片,是图片转存到本地路径 101 if (keyStr.contains(".png")) { 102 String[] split = keyStr.split("\\\\"); 103 String fileName = split[split.length - 1]; 104 //拼接转存位置 105 smf.byte2image(bytes,"/linux路径" + fileName); 106 }else { 107 System.out.println("读取到文件:" + keyStr + ",content:" + content); 108 } 109 110 } 111 } 112 113 /** 114 *将byte数组转成图片 115 * @param data 116 * @param path 117 */ 118 public void byte2image(byte[] data,String path){ 119 if(data.length<3||path.equals("")) return; 120 try{ 121 FileImageOutputStream imageOutput = new FileImageOutputStream(new File(path)); 122 imageOutput.write(data, 0, data.length); 123 imageOutput.close(); 124 System.out.println("转换图片成功 " + path); 125 } catch(Exception ex) { 126 System.out.println("Exception: " + ex); 127 ex.printStackTrace(); 128 } 129 } 130 131 132 //测试 133 public static void main(String[] args) throws Exception { 134 MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile(); 135 136 /*List<String> smallFilePaths = new ArrayList<String>(); 137 smallFilePaths = msf.getFiles("/tmp/logs",smallFilePaths);*/ 138 139 /* msf.addInputPath("C:\\Users\\HR\\Desktop\\bigfile"); 140 141 for (String smallFilePath : smallFilePaths) { 142 System.out.println(smallFilePath); 143 } 144 145 msf.mergeFile();*/ 146 147 msf.readMergedFile(); 148 149 /*msf.mergeFile(); 150 151 msf.readMergedFile();*/ 152 } 153 }
标签:文件,String,org,SequenceFile,file,new,apache,import,转存 来源: https://www.cnblogs.com/China2008512/p/15428461.html