其他分享
首页 > 其他分享> > Flink-窗口函数(Window)

Flink-窗口函数(Window)

作者:互联网

1.Window概念

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。 Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。  

2.Window类型

2.1 TimeWindow:按照时间生成Window

2.1.1 滚动时间窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。 特点:时间对齐,窗口长度固定,没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示: 适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。  

2.1.2 滑动时间窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。 特点:时间对齐,窗口长度固定,可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示: 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。  

2.1.3会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。 特点:时间无对齐。 session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。  

2.2 CountWindow:按照指定的数据条数生成一个Window,与时间无关

2.2.1滚动计数窗口

2.2.2滑动计数窗口

 

3.Window API

窗口分配器-window()方法 我们可以用window()来定义一个窗口,然后基于这个window去做一些聚合或者其它处理操作。 注意:window()方法必须在keyBy之后才能用,作用于keyedStream 窗口分配器(window assigner) window()方法接受的输入参数是一个WindowAssigner
  /**
   * Windows this data stream to a [[WindowedStream]], which evaluates windows
   * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The
   * grouping of elements is done both by key and by window.
   *
   * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
   * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
   * that is used if a `Trigger` is not specified.
   *
   * @param assigner The `WindowAssigner` that assigns elements to windows.
   * @return The trigger windows data stream.
   */
  @PublicEvolving
  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
    new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
  }
WindowAssigner负责将每条输入的数据分发到正确的window中 Flink提供了通用的WindowAssigner 滚动窗口(tumbling window) 滑动窗口(sliding window) 会话窗口(session window) 全局窗口(global window)   Flink提供了更加简单的.timeWindow和.countWindow方法,用于定义时间窗口和计数窗口。

3.1 TimeWindow

TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

3.1.1滚动窗口(TumblingEventTimeWindows)

Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。  

3.1.2滑动窗口(SlidingEventTimeWindows)

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。  

3.1.3会话窗口(EventTimeSessionWindows)

指定一个时间间隔,当指定时间间隔内无新数据进入,则认为是一个session,进行计算  

3.2 CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。 注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

3.2.1滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量 达到窗口大小时,就会触发窗口的执行。

3.2.2滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。 下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。  

3.3 WindowFunction

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类: 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction,AggregateFunction。 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。  

3.4 其它可选API

.trigger() —— 触发器 定义 window 什么时候关闭,触发计算并输出结果 .evitor() —— 移除器 定义移除某些数据的逻辑 .allowedLateness() —— 允许处理迟到的数据 .sideOutputLateData() —— 将迟到的数据放入侧输出流 .getSideOutput() —— 获取侧输出流   相关测试代码:
package com.zhen.flink.api

import java.time.Duration
import java.util.concurrent.TimeUnit

import com.zhen.flink.api.WindowTest.SerializableTimestampAssignerTest
import org.apache.flink.api.common.eventtime.{BoundedOutOfOrdernessWatermarks, SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @Author FengZhen
  * @Date 7/12/22 3:43 PM
  * @Description TODO
  */
object WindowTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    /**
      * 需要设置时间语义
      * 如果设置 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime):
      *
      * 则后面 不能使用 .window(TumblingEventTimeWindows.of等  EventTime的窗口,不然会报这个错!注意:ProcessingTime本身就是单调递增的,不必设置水位线!
      */

//    import org.apache.flink.streaming.api.TimeCharacteristic
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.setParallelism(1)

    // 0.读取数据
    // nc -lk 7777
    val inputStream = env.socketTextStream("localhost", 7777)
    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )
//      .assignTimestampsAndWatermarks(
//        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
//         .withTimestampAssigner(new SerializableTimestampAssignerTest)
//      )



    // 每15s统计一次窗口内各传感器所有温度的最小值, 以及最新的时间戳
    val resultStream = dataStream
      .map( data => (data.id, data.temperature, data.timestamp))
      .keyBy(_._1)  // 按照二元组的第一个元素(id)进行分组
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
//      .window( TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(3)) ) // 滚动时间窗口,第二个参数为偏移量,可解决时区问题,或者统计时具体到某分钟
//      .window( SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5), Time.seconds(3))) // 滑动时间窗口
//      .window( EventTimeSessionWindows.withGap(Time.seconds(15))) // 会话窗口
//      .countWindow(10) // 滚动窗口
//      .countWindow(10, 2) //滑动窗口
//      .minBy(1)
      .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3))

    //另外一种方式,自定义reducer方法
//    val resultStream = dataStream
//      .keyBy(_.id)
//      .window( TumblingEventTimeWindows.of(Time.seconds(15), Time.seconds(3)) ) // 滚动时间窗口,第二个参数为偏移量,可解决时区问题,或者统计时具体到某分钟
//      .reduce(new MyReducer)

    resultStream.print()
    env.execute("window test")
  }

  class MyReducer extends ReduceFunction[SensorReading]{
    override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
      SensorReading(value1.id, value2.timestamp, value1.temperature.min(value2.temperature))
    }
  }

  class SerializableTimestampAssignerTest extends SerializableTimestampAssigner[SensorReading] {

    override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
      val eventTime = element.timestamp
      //recordTimestamp即element事件的时间
      eventTime
    }
  }

}

 

                 

标签:flink,窗口,Flink,window,Window,api,import,滑动
来源: https://www.cnblogs.com/EnzoDin/p/16496153.html