Flink-窗口函数(Window)
作者:互联网
1.Window概念
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。 Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。2.Window类型
2.1 TimeWindow:按照时间生成Window
2.1.1 滚动时间窗口(Tumbling Windows)
将数据依据固定的窗口长度对数据进行切片。 特点:时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示: 适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。2.1.2 滑动时间窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。 特点:时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示: 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。2.1.3会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 特点:时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。2.2 CountWindow:按照指定的数据条数生成一个Window,与时间无关
2.2.1滚动计数窗口
2.2.2滑动计数窗口
3.Window API
窗口分配器-window()方法 我们可以用window()来定义一个窗口,然后基于这个window去做一些聚合或者其它处理操作。 注意:window()方法必须在keyBy之后才能用,作用于keyedStream 窗口分配器(window assigner) window()方法接受的输入参数是一个WindowAssigner/** * Windows this data stream to a [[WindowedStream]], which evaluates windows * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The * grouping of elements is done both by key and by window. * * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify * when windows are evaluated. However, `WindowAssigner` have a default `Trigger` * that is used if a `Trigger` is not specified. * * @param assigner The `WindowAssigner` that assigns elements to windows. * @return The trigger windows data stream. */ @PublicEvolving def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = { new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner)) }WindowAssigner负责将每条输入的数据分发到正确的window中 Flink提供了通用的WindowAssigner 滚动窗口(tumbling window) 滑动窗口(sliding window) 会话窗口(session window) 全局窗口(global window) Flink提供了更加简单的.timeWindow和.countWindow方法,用于定义时间窗口和计数窗口。
3.1 TimeWindow
TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。3.1.1滚动窗口(TumblingEventTimeWindows)
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。3.1.2滑动窗口(SlidingEventTimeWindows)
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。3.1.3会话窗口(EventTimeSessionWindows)
指定一个时间间隔,当指定时间间隔内无新数据进入,则认为是一个session,进行计算3.2 CountWindow
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。 注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。3.2.1滚动窗口
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量 达到窗口大小时,就会触发窗口的执行。3.2.2滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。3.3 WindowFunction
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类: 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction,AggregateFunction。 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。3.4 其它可选API
.trigger() —— 触发器 定义 window 什么时候关闭,触发计算并输出结果 .evitor() —— 移除器 定义移除某些数据的逻辑 .allowedLateness() —— 允许处理迟到的数据 .sideOutputLateData() —— 将迟到的数据放入侧输出流 .getSideOutput() —— 获取侧输出流 相关测试代码:package com.zhen.flink.api import java.time.Duration import java.util.concurrent.TimeUnit import com.zhen.flink.api.WindowTest.SerializableTimestampAssignerTest import org.apache.flink.api.common.eventtime.{BoundedOutOfOrdernessWatermarks, SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time /** * @Author FengZhen * @Date 7/12/22 3:43 PM * @Description TODO */ object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment /** * 需要设置时间语义 * 如果设置 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime): * * 则后面 不能使用 .window(TumblingEventTimeWindows.of等 EventTime的窗口,不然会报这个错!注意:ProcessingTime本身就是单调递增的,不必设置水位线! */ // import org.apache.flink.streaming.api.TimeCharacteristic // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 0.读取数据 // nc -lk 7777 val inputStream = env.socketTextStream("localhost", 7777) // 1.先转换成样例数据 val dataStream: DataStream[SensorReading] = inputStream .map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) // .assignTimestampsAndWatermarks( // WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // .withTimestampAssigner(new SerializableTimestampAssignerTest) // ) // 每15s统计一次窗口内各传感器所有温度的最小值, 以及最新的时间戳 val resultStream = dataStream .map( data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) // 按照二元组的第一个元素(id)进行分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3))) // .window( TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(3)) ) // 滚动时间窗口,第二个参数为偏移量,可解决时区问题,或者统计时具体到某分钟 // .window( SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5), Time.seconds(3))) // 滑动时间窗口 // .window( EventTimeSessionWindows.withGap(Time.seconds(15))) // 会话窗口 // .countWindow(10) // 滚动窗口 // .countWindow(10, 2) //滑动窗口 // .minBy(1) .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) //另外一种方式,自定义reducer方法 // val resultStream = dataStream // .keyBy(_.id) // .window( TumblingEventTimeWindows.of(Time.seconds(15), Time.seconds(3)) ) // 滚动时间窗口,第二个参数为偏移量,可解决时区问题,或者统计时具体到某分钟 // .reduce(new MyReducer) resultStream.print() env.execute("window test") } class MyReducer extends ReduceFunction[SensorReading]{ override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = { SensorReading(value1.id, value2.timestamp, value1.temperature.min(value2.temperature)) } } class SerializableTimestampAssignerTest extends SerializableTimestampAssigner[SensorReading] { override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = { val eventTime = element.timestamp //recordTimestamp即element事件的时间 eventTime } } }
标签:flink,窗口,Flink,window,Window,api,import,滑动 来源: https://www.cnblogs.com/EnzoDin/p/16496153.html