编程语言
首页 > 编程语言> > java-Apache Beam TextIO glob获取原始文件名

java-Apache Beam TextIO glob获取原始文件名

作者:互联网

我已经建立了管道.我必须解析数百个* .gz文件.因此,glob的效果很好.

但是我需要当前处理文件的原始名称,因为我想将结果文件命名为原始文件.

有人能帮我一下吗?

这是我的代码.

@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
        read.getName();

        p.apply("ReadLines", read).apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));

    p.run().waitUntilFinish();

解决方法:

从Beam 2.2开始,可以结合使用FileIO.match(),FileIO.read()和自定义代码来读取文本行.您已经可以在HEAD上使用它,或者可以等到2.2版完成(当前正在进行中).

PCollection<KV<String, String>> filesAndLines = 
  p.apply(FileIO.match().filepattern(...))
   .apply(FileIO.read())
   .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
     @ProcessElement
     public void process(ProcessContext c) {
       ReadableFile f = c.element();
       String filename = f.getMetadata().resourceId().toString();
       String line;
       try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) {
         while ((line = r.readLine()) != null) {
           c.output(KV.of(filename, line));
         }
       }
     }
   }));

标签:clob,apache-beam,java,google-cloud-dataflow
来源: https://codeday.me/bug/20191025/1929755.html