Flink WaterMark原理简述
作者:互联网
1.Flink中Time概念
我们知道在分布式环境中 Time 是一个很重要的概念,在 Flink 中 Time 可以分为三种Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系我们可以从下图中得知:
-
Event-Time 表示事件发生的时间
-
Processing-Time 则表示处理消息的时间
-
Ingestion-Time 表示进入到系统的时间
在 Flink 中我们可以通过下面的方式进行 Time 类型的设置
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 设置使用 ProcessingTime
以上摘自:
Apache Flink 零基础入门(六):Flink Time & Window 解析
2. Flink中Watermark 概念
我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。
那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 watermark 例子
以上摘自:
Apache Flink 零基础入门(六):Flink Time & Window 解析
3. WaterMark的两种生成方式
如上图watermark的生成有两种方式:
-
实现AssignerWithPeriodicWatermarks接口(周期性水位线)
按照固定时间间隔生成新的水位线,不管是否有新的消息抵达,水位线提升的时间间隔是由用户设置的,在两次水位线提升间隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { //... //获取当前的时间戳 @Override public final Watermark getCurrentWatermark() { // 这里保证了watermark是单调递增 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; //当前最大时间戳 - 允许乱序延迟时间 相当于: watermark = eventTime -t if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } //提取时间戳 @Override public final long extractTimestamp(T element, long previousElementTimestamp) { //从元素中提取时间戳 long timestamp = extractTimestamp(element); //如果元素中提取的时间戳大于当前最大的时间戳,则更新 if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }
注:周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
-
实现AssignerWithPunctuatedWatermarks接口(标点性水位线)
标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。
public class WatermarkOnFlagAssigner implements AssignerWithPunctuatedWatermarks<MyElement> { //截取flink源码demo @Override public long extractTimestamp(MyElement element, long previousElementTimestamp) { //从元素中提取时间戳 return element.getSequenceTimestamp(); } //检查和获取下一个时间戳 @Override public Watermark checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) { //元素完成处理就产生一个新的watermark return lastElement.isEndOfSequence() ? new Watermark(extractedTimestamp) : null; } } }
注:数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
4. 迟到事件
虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:
- 重新激活已经关闭的窗口并重新计算以修正结果。
- 将迟到事件收集起来另外处理。
- 将迟到事件视为错误消息并丢弃。
Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。
Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
以上部分摘自:
标签:WaterMark,Watermark,Flink,迟到,简述,水位,事件,Time 来源: https://www.cnblogs.com/qiangsky/p/16444073.html