其他分享
首页 > 其他分享> > Flink 基石、Flink Time、事件时间、Watermark水位线

Flink 基石、Flink Time、事件时间、Watermark水位线

作者:互联网

Flink 基石、Flink Time、事件时间、Watermark水位线

目录

img

img

事件时间

代码示例

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    用户id,事件时间
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件时间划分窗口
      * 1、设置事件模式为事件时间
      * 2、指定时间字段
      */

    /**
      * 每隔5秒统计用户出现的次数
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //这里需要将并行度设置为1
    //因为这里存在一个时间戳对齐的问题,多并行度的时候会对不齐
    //不会触发事件时间的计算
    env.setParallelism(1)

    //设置时间模式
    //默认是处理时间
    //TimeCharacteristic.EventTime -- 事件时间
    //TimeCharacteristic.IngestionTime -- 接收时间
    //TimeCharacteristic.ProcessingTime -- 处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //设置时间字段
    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    /**
      * 事件时间窗口触发条件
      * 1、窗口内有数据
      * 2、最新数据的事件时间大于等于窗口的结束数据的时间
      * 但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?

窗口如果被计算了,之后再来一条属于这个窗口的数据会丢数据

Watermark

水位线

img

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件事件划分窗口
      * 1、设置事件模式为事件时间
      * 2、指定时间字段
      */

    /**
      * 每隔5秒统计用户出现的次数
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //设置时间模式
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //设置时间字段,水位线默认等于最新数据的时间戳,水位线只增加不减少
    //    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    //设置水位线和时间字段
    val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
      //执行水位线前移的时间
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        //指定时间戳字段
        override def extractTimestamp(element: (String, Long)): Long = element._2
      }
    )

    /**
      * 事件时间窗口触发条件
      * 1、窗口内有数据
      * 2、最新数据的时间大于等于窗口的结束数据
      *
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

学习一个新框架,会看官网很重要

标签:String,val,Watermark,Flink,001,时间,Time
来源: https://www.cnblogs.com/saowei/p/16032325.html