其他分享
首页 > 其他分享> > Flink StreamingFileSink 文件到hdfs 文件一直处于inprogress状态无法生成正式文件

Flink StreamingFileSink 文件到hdfs 文件一直处于inprogress状态无法生成正式文件

作者:互联网

一、问题描述:

 任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。

在这里插入图片描述
任务运行一段时间可能会出现如下错误:
在这里插入图片描述

二、解决过程:

  1. 开始是猜想可能是并行度过多,导致产生大量临时文件,文件句柄太多,关闭耗时过久导致文件一分钟内一直无法完成合并?将并行度调整到1,发现问题并没有解决。
  2. 又猜想是因为调整了checkpoint参数,禁用掉checkpoint失败后触发Flink任务重启策略导致checkpoint受影响无法完成临时文件合并?将代码回退,发现问题还没有解决。
  3. 通过观察文件目录,发现临时文件名前缀,跟已合并文件名的前缀一致,猜测可能是文件已存在,临时文件无法完成合并为同名文件?备份并清理掉历史数据,发现文件正常生成, 问题解决!!。

此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。

三、原因深度剖析:

3.1、StreamingFileSink原理简介

提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
在这里插入图片描述

 本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
 为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:

1).In-progress :

当前文件正在写入中

2).Pending :

当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

3).Finished :

在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

 Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
在这里插入图片描述

3.2、 回到问题,part-0-561.xxx,文件名中数字来自哪里?为什么每个文件生成之后其编号是累加的?为什么会重启之后数字又会重新开始编号? 文件中数字来自哪里?

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelper→

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→

org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)

 this.partCounter = initialPartCounter(maxPartCounter);

 初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。

在这里插入图片描述在这里插入图片描述
 数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
在这里插入图片描述
 拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填中供下次获取。
在这里插入图片描述 快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。

在这里插入图片描述

四、解决方案:

 在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。

五、总结

 问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
 知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。

标签:文件,flink,快照,hdfs,Flink,sink,StreamingFileSink
来源: https://blog.csdn.net/haozhuxuan/article/details/122078203