Flink之watermark 详解
作者:互联网
watermark介绍
在Flink中,Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。
从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线)。
如上图:
● w(11): 表示11之前的数据到已经到达,11之前的数据可以进行计算了。
● w(20): 表示20之前的数据到已经到达,20之前的数据可以进行计算了。
watermark的使用
生成时机
watermark可以在接收到DataSource的数据后,立刻生成Watermark。也可以在DataSource后,使用map或者filter操作后再生成watermark。
水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,所以是可以考虑在生成水位线之前使用。
watermark的计算
watermark = 进入 Flink 窗口的最大的事件时间(maxEventTime) — 指定的延迟时间(t)
生成方式
第一种:With Periodic Watermarks
这个是周期性触发Waterrmark的生成和发送。
周期性分配水位线在程序中会比较常用,是我们会指示系统以固定的时间间隔发出的水位线。
在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。
设置任务时间类型和
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置时间使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置并行度为1
env.setParallelism(1)
//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(1000)
设置水位线watermark的值
//通过本地socket端口获取数据
val dataStream = env.socketTextStream("127.0.0.1",10010)
//对数据的数据进行转换为tuple2的格式
val tupStream = dataStream.map(line => {
val arr = line.split(" ")
(arr(0),arr(1).toLong)
})
//设置水位线
val waterDataStream = tupStream.assignTimestampsAndWatermarks(
//设置时间最低延迟
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
//设置时间戳
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String,Long]] {
//当前最大的值
var currentMaxNum = 0l
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(t: Tuple2[String,Long], recordTimesstamp: Long): Long = {
val eTime = t._2
currentMaxNum = Math.max(eTime,currentMaxNum)
//当前最大的值减去 允许乱序的数据,即为现在的水位线值。
//注意:这些代码只是为了本地观察方便,正常开发中是不需这样写的。
val waterMark = currentMaxNum - 2000;
println("数据:"+t.toString()+" ,"+sdf.format(eTime)+" , 当前watermark: "+sdf.format(waterMark))
eTime
}
})
)
//对数据进行计算和输出
waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{
(e1._1,e1._2+e2._2)
}).print()
输入和输出:
--------------------输入
s3 1639100010955
s2 1639100009955
s1 1639100008955
s0 1639100007955
s4 1639100011955
s5 1639100012955
s6 1639100013955
s7 1639100016955
--------------------输出
数据:(s3,1639100010955) ,2021-12-10 09:33:30 , 当前watermark: 2021-12-10 09:33:28
数据:(s2,1639100009955) ,2021-12-10 09:33:29 , 当前watermark: 2021-12-10 09:33:28
数据:(s1,1639100008955) ,2021-12-10 09:33:28 , 当前watermark: 2021-12-10 09:33:28
数据:(s0,1639100007955) ,2021-12-10 09:33:27 , 当前watermark: 2021-12-10 09:33:28
数据:(s4,1639100011955) ,2021-12-10 09:33:31 , 当前watermark: 2021-12-10 09:33:29
数据:(s5,1639100012955) ,2021-12-10 09:33:32 , 当前watermark: 2021-12-10 09:33:30
(s2,1639100009955)
(s0,1639100007955)
(s1,1639100008955)
数据:(s6,1639100013955) ,2021-12-10 09:33:33 , 当前watermark: 2021-12-10 09:33:31
数据:(s7,1639100016955) ,2021-12-10 09:33:36 , 当前watermark: 2021-12-10 09:33:34
(s3,1639100010955)
(s5,1639100012955)
(s4,1639100011955)
说明:
- 在使用timeWindow的时候,会根据设置的窗口大小 3,将一分钟内的窗口划分为:
0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35… - watermark的值是当前输入数据中最大时间戳-去乱序时间。 在watermark前的数据才会被认定是正常的,可供window进行计算的数据。
- 上面程序中输入s3-s4时,watermark为的秒数是28和29,是在 timewindow划分的时间窗口 27-29 中,所以没有触发计算。直到输入s5,此时watermark秒数是30,在另一个窗口 30-32的窗口中,才会触发 27-29窗口的计算,所以才输出 s2,s0,s1的值。
- 同理到s7的时候,又是另一个窗口33-35,所以触发上一个窗口的计算。
第二种: With Punctuated Watermarks
定点水位线(标记水位线)不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。
由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// onEvent 中已经实现
}
}
延迟数据的处理方式
针对延迟太久的数据有3中处理方案:
- 丢弃(默认)
- allowedLateness: 指定允许数据延迟的时间
- sideOutputLateData: 收集迟到的数据
-
对于迟到太久的数据默认是丢弃的。 不会触发window。因为输入的数据所在的窗口已经执行过了。Flink对这些迟到数据执行的方案就是丢弃。
-
如果迟到不久,输入的数据所在的窗口还未执行,是不会丢弃的。 这个要看窗口大小和最大允许的数据乱序时间。
附上 Flink官方文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/
标签:10,12,watermark,33,Flink,详解,2021,09 来源: https://blog.csdn.net/pengff1234/article/details/121869647