Flink 四大基石之时间和水位线原理介绍!
作者:互联网
在Flink中,涉及到时间和水位线这一概念,时间是Flink中的四大基石(Checkpoint、State、Time、Window)之一,是实现流批统一的一个重要特性。本文讲解内容包含以下六部分:
- Flink 时间分类
- WaterMark引入
- 水印运行原理
- 水印API调用
- 侧道输出保障超过WaterMark数据不丢失
- WaterMark+EventTimeWindow+Allowed Lateness案例
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
- EventTime[事件时间]
事件发生的时间,例如:点击网站上的某个链接的时间,每一条日志都会记录自己的生成时间
如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime
- IngestionTime[摄入时间]
数据进入Flink的时间,如某个Flink节点的sourceoperator接收到数据的时间,例如:某个source消费到kafka中的数据
如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准
- ProcessingTime[处理时间]
某个Flink节点执行某个operation的时间,例如:timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time
如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准
在Flink的流式处理中,绝大部分的业务都会使用EventTime,一般只在EventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
三种时间设置代码如下:
final StreamExecutionEnvironment env
= StreamExecutionEnvironment.getExecutionEnvironrnent();
// 使用处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) ;
// 使用摄入时间
env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime);
// 使用事件时间
env.setStrearnTimeCharacteristic(TimeCharacteri stic Eve~tTime);
通过两个案例来了解什么是水印,主要解决什么问题?
案例1: 假你正在去往地下停车场的路上,并且打算用手机点一份外卖。
选好了外卖后,你就用在线支付功能付款了,这个时候是11点50分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点05分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,支付数据的事件时间是11点50分,而支付数据的处理时间是12点05分
案例2: 如上图所示,某App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
A 用户在11:02 对 App 进行操作,B用户在11:03 操作了 App,
但是A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B 用户11:03 的消息,然后再接受到A 用户11:02 的消息,消息乱序了。
通过上面的例子,我们知道,在进行数据处理的时候应该按照事件时间进行处理,也就是窗口应该要考虑到事件时间,但是窗口不能无限的一直等到延迟数据的到来,需要有一个触发窗口计算的机制,就是watermaker水位线/水印机制。
所以:水印是用来解决数据延迟、数据乱序等问题,总结如下图所示:
水印就是一个时间戳(timestamp),Flink可以给数据流添加水印
- 水印并不会影响原有Eventtime事件时间
- 当数据流添加水印后,会按照水印时间来触发窗口计算,也就是说watermark水印是用来触发窗口计算的
- 设置水印时间,会比事件时间小几秒钟,表示最大允许数据延迟达到多久
- 水印时间 = 事件时间 - 允许延迟时间 (例如:10:09:57 = 10:10:00 - 3s )
如下图所示:
窗口是10分钟触发一次,现在在12:00 - 12:10 有一个窗口,本来有一条数据是在12:00 - 12:10这个窗口被计算,但因为延迟,12:12到达,这时12:00 - 12:10 这个窗口就会被关闭,只能将数据下发到下一个窗口进行计算,这样就产生了数据延迟,造成计算不准确。
现在添加一个水位线:数据时间戳为2分钟。这时用数据产生的事件时间 12:12 -允许延迟的水印 2分钟 = 12:10 >= 窗口结束时间 。窗口触发计算,该数据就会被计算到这个窗口里
在DataStream API 中使用 TimestampAssigner 接口定义时间戳的提取行为,包含两个子接口 AssignerWithPeriodicWatermarks 接口和 AssignerWithPunctuatedWaterMarks接口
使用 WaterMark+ EventTimeWindow 机制可以在一定程度上解决数据乱序的问题,但是,WaterMark 水位线也不是万能的,在某些情况下,数据延迟会非常严重,即使通过Watermark + EventTimeWindow也无法等到数据全部进入窗口再进行处理,因为窗口触发计算后,对于延迟到达的本属于该窗口的数据,Flink默认会将这些延迟严重的数据进行丢弃
那么如果想要让一定时间范围的延迟数据不会被丢弃,可以使用Allowed Lateness(允许迟到机制/侧道输出机制)设定一个允许延迟的时间和侧道输出对象来解决
即使用WaterMark + EventTimeWindow + Allowed Lateness方案(包含侧道输出),可以做到数据不丢失。
API调用
l allowedLateness(lateness:Time)---设置允许延迟的时间
该方法传入一个Time值,设置允许数据迟到的时间,这个时间和watermark中的时间概念不同。再来回顾一下,
watermark=数据的事件时间-允许乱序时间值
随着新数据的到来,watermark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,watermark值则不会更新。
总的来说,watermark永远不会倒退它是为了能接收到尽可能多的乱序数据。
那这里的Time值呢?主要是为了等待迟到的数据,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明
注意:该方法只针对于基于event-time的窗口
l sideOutputLateData(outputTag:OutputTag[T])--保存延迟数据
该方法是将迟来的数据保存至给定的outputTag参数,而OutputTag则是用来标记延迟数据的一个对象。
l DataStream.getSideOutput(tag:OutputTag[X])--获取延迟数据
通过window等操作返回的DataStream调用该方法,传入标记延迟数据的对象来获取延迟的数据
package cn.itcast.watermark
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/*
演示水印如何解决数据乱序问题
*/
object SlideOutputDemo {
//样例类 CarWc(信号等id,数量)
case class CarWc(id: String, num: Int, ts: Long)
def main(args: Array[String]): Unit = {
// 1 获取运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//方便观察数据设置并行度为1
env.setParallelism(1)
//设置基于事件时间进行计算
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2 接收数据
val socketDs = env.socketTextStream("hlink161", 9999)
// 3 transformation 接收到数据之后按照逗号切分,拿到红绿灯id,数量(通过汽车数量)
val carWcDs: DataStream[CarWc] = socketDs.map(
line => {
val arr: Array[String] = line.split(",")
// println("===="+line)
//3.1 封装样例类
CarWc(arr(0), arr(1).toInt, arr(2).toLong)
}
)
// 3.1 设置水印机制,我们自己动手实现
val waterMarkDs: DataStream[CarWc] = carWcDs.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[CarWc]{
/** The current maximum timestamp seen so far. */
private var currentMaxTimestamp = 0L
//定义最大允许的延迟时间
private var maxOutOfOrderness=0
/** The timestamp of the last emitted watermark. */ //巨大的负数
private var lastEmittedWatermark:Long= Long.MinValue
//3.1.1计算并生成水印
override def getCurrentWatermark: Watermark = {
// this guarantees that the watermark never goes backwards.
//计算出的watermark时间
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
//保证水印时间不会回退!!
if (potentialWM >= lastEmittedWatermark)
lastEmittedWatermark = potentialWM
return new Watermark(lastEmittedWatermark)
}
//3.1.2 获取到eventtime字段
override def extractTimestamp(element: CarWc, previousElementTimestamp: Long): Long = {
//获取日志的eventtime
val timestamp = element.ts
//判断比较 eventtime是不是大于最大的eventtime时间
if (timestamp > currentMaxTimestamp)
currentMaxTimestamp = timestamp
//返回日志的eventtime时间
timestamp
}
} )
// pre-4 定义一个侧输出流
val outputTag: OutputTag[CarWc] = new OutputTag[CarWc]("delayCarWc")
// 4 设置窗口
val windowStream: WindowedStream[CarWc, String, TimeWindow] = waterMarkDs.keyBy(_.id).timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(outputTag)
//5 使用apply方法执行聚合计算
val windowRes: DataStream[CarWc] = windowStream.apply(new WindowFunction[CarWc, CarWc, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[CarWc], out: Collector[CarWc]): Unit = {
val iter: Iterator[CarWc] = input.iterator
println("窗口开始时间 》》"+window.getStart+"== 窗口结束时间》》"+window.getEnd+"数据 【"+input.iterator.mkString(";")+"】")
val wc: CarWc = iter.reduce((t1, t2) =>
CarWc(t1.id, t1.num + t2.num, t1.ts)
)
out.collect(wc)
}
})
windowRes.print("窗口计算结果>>")
//获取到侧输出流结果
val outputDs: DataStream[CarWc] = windowRes.getSideOutput(outputTag)
outputDs.print("侧输出流数据")
env.execute()
}
}
以上就是时间及水位线原理讲解的所有内容!觉得好的,点赞,在看,分享三连击,谢谢!!!
找各类大数据技术文章和面经,就来
<3分钟秒懂大数据>
随时更新互联网大数据组件内容
专为学习者提供技术博文
快和身边的小伙伴一起关注我们吧!
作者简介:逆流而上Mr李,秋招7offer,CSDN博客:https://blog.csdn.net/weixin_38201936
标签:窗口,Flink,水印,水位,时间,延迟,数据,CarWc,基石 来源: https://blog.csdn.net/weixin_38201936/article/details/118421532