其他分享
首页 > 其他分享> > 小文件转存SequenceFile

小文件转存SequenceFile

作者:互联网

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileUtil;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.BytesWritable;
  5 import org.apache.hadoop.io.SequenceFile;
  6 import org.apache.hadoop.io.SequenceFile.Reader;
  7 import org.apache.hadoop.io.SequenceFile.Writer;
  8 import org.apache.hadoop.io.Text;
  9 import org.slf4j.Logger;
 10 import org.slf4j.LoggerFactory;
 11 
 12 import javax.imageio.stream.FileImageOutputStream;
 13 import java.io.File;
 14 import java.io.FileInputStream;
 15 import java.nio.charset.StandardCharsets;
 16 import java.util.ArrayList;
 17 import java.util.List;
 18 
 19 public class MergeSmallFilesToSequenceFile {
 20     private static Logger logger = LoggerFactory.getLogger(MergeSmallFilesToSequenceFile.class);
 21     private static Configuration configuration = new Configuration();
 22     private static List<String> smallFilePaths = new ArrayList<String>();
 23 
 24     /**
 25      *添加路径,读取文件夹下所有的文件绝对路径
 26      * @param inputPath
 27      * @throws Exception
 28      */
 29     public void addInputPath(String inputPath) throws Exception{
 30 
 31         File file = new File(inputPath);
 32 
 33         if(file.isDirectory()){
 34             File[] files = FileUtil.listFiles(file);
 35             for(File sFile:files){
 36                 smallFilePaths.add(sFile.getPath());
 37                 logger.info("添加小文件路径:" + sFile.getPath());
 38             }
 39         }else{
 40             smallFilePaths.add(file.getPath());
 41             logger.info("添加小文件路径:" + file.getPath());
 42         }
 43     }
 44 
 45     /**
 46      *合并小文件序列化存储到hdfs
 47      * @throws Exception
 48      */
 49     public void mergeFile() throws Exception{
 50 
 51         Writer.Option bigFile = Writer.file(new Path("/SequenceFile_Test/test/wangxin.test"));
 52 
 53         Writer.Option keyClass = Writer.keyClass(Text.class);
 54         Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
 55 
 56         Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
 57 
 58         Text key = new Text();
 59         for(String path:smallFilePaths){
 60             File file = new File(path);
 61             long fileSize = file.length();
 62             byte[] fileContent = new byte[(int)fileSize];
 63             FileInputStream inputStream = new FileInputStream(file);
 64             inputStream.read(fileContent, 0, (int)fileSize);
 65 
 66             //String md5Str = DigestUtils.md5Hex(fileContent);
 67 
 68             //logger.info("merge小文件:"+path+",md5:"+md5Str);
 69             key.set(path);
 70 
 71             writer.append(key, new BytesWritable(fileContent));
 72         }
 73         writer.hflush();
 74 
 75         /*for (String path : smallFilePaths) {
 76             File file = new File(path);
 77             file.deleteOnExit();
 78         }*/
 79         writer.close();
 80     }
 81 
 82     /**
 83      * 在hdfs指定位置读取文件,读取k,v
 84      * @throws Exception
 85      */
 86     public void readMergedFile() throws Exception{
 87         MergeSmallFilesToSequenceFile smf = new MergeSmallFilesToSequenceFile();
 88         configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
 89         Reader.Option file = Reader.file(new Path("hdfs://集群名/hdfs路径"));
 90         Reader reader = new Reader(configuration, file);
 91         Text key = new Text();
 92         BytesWritable value = new BytesWritable();
 93         while(reader.next(key, value)){
 94             byte[] bytes = value.copyBytes();
 95             //String md5 = DigestUtils.md5Hex(bytes);
 96             String content = new String(bytes, StandardCharsets.UTF_8);
 97             //logger.info("读取到文件:"+key+",md5:"+md5+",content:"+content);
 98             byte[] keyBytes = key.copyBytes();
 99             String keyStr = new String(keyBytes, StandardCharsets.UTF_8);
100             //判断是不是图片,是图片转存到本地路径
101             if (keyStr.contains(".png")) {
102                 String[] split = keyStr.split("\\\\");
103                 String fileName = split[split.length - 1];
104                 //拼接转存位置
105                 smf.byte2image(bytes,"/linux路径" + fileName);
106             }else {
107                 System.out.println("读取到文件:" + keyStr + ",content:" + content);
108             }
109 
110         }
111     }
112 
113     /**
114      *将byte数组转成图片
115      * @param data
116      * @param path
117      */
118     public  void byte2image(byte[] data,String path){
119         if(data.length<3||path.equals("")) return;
120         try{
121             FileImageOutputStream imageOutput = new FileImageOutputStream(new File(path));
122             imageOutput.write(data, 0, data.length);
123             imageOutput.close();
124             System.out.println("转换图片成功 " + path);
125         } catch(Exception ex) {
126             System.out.println("Exception: " + ex);
127             ex.printStackTrace();
128         }
129     }
130 
131 
132     //测试
133     public static void main(String[] args) throws Exception {
134         MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
135 
136         /*List<String> smallFilePaths = new ArrayList<String>();
137         smallFilePaths = msf.getFiles("/tmp/logs",smallFilePaths);*/
138 
139 /*        msf.addInputPath("C:\\Users\\HR\\Desktop\\bigfile");
140 
141         for (String smallFilePath : smallFilePaths) {
142             System.out.println(smallFilePath);
143         }
144 
145         msf.mergeFile();*/
146 
147         msf.readMergedFile();
148 
149         /*msf.mergeFile();
150 
151         msf.readMergedFile();*/
152     }
153 }

 

标签:文件,String,org,SequenceFile,file,new,apache,import,转存
来源: https://www.cnblogs.com/China2008512/p/15428461.html