其他分享
首页 > 其他分享> > Flink -time(处理时间,事件时间,水位线)

Flink -time(处理时间,事件时间,水位线)

作者:互联网

Flink -time(处理时间,事件时间,水位线)

1. flink基石

2. Time

3. 统计时间

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object Demo4ProcessionTime {
  def main(args: Array[String]): Unit = {
    /**
     * 统计最近10秒单词的数量,每隔4秒统计一次
     *
     */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      //窗口大小时10秒,滑动时间是5秒2
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

4. 事件时间

package com.wt.flink.core
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object Demo5EventTime {
  def main(args: Array[String]): Unit = {
    /**
     * 数据:
     * A9A7N2 340500 161471180000
       A9A7N2 340500 161471181000
       A9A7N2 340500 161471182000
       A9A7N2 340500 161471183000
       A9A7N2 340500 161471184000
       A9A7N2 340500 161471185000
       A9A7N2 340500 161471186000
       A9A7N2 340500 161471187000
       A9A7N2 340500 161471188000
       A9A7N2 340500 161471189000
       A9A7N2 340500 161471190000
       A9A7N2 340500 161471191000
     *
     */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取卡口过车数据
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)

    //整理数据取出道路编号和时间戳
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
      val split: Array[String] = line.split(",")
      //道路编号
      val roadId: String = split(1)
      //时间戳
      val ts: Long = split(2).toLong
      (roadId, ts)
    })

    /**
     * 要使用事件时间需要告诉flink程序哪一个字段是事件时间
     * 时间字段必须是毫秒级别
     *
     */
    val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(kv => kv._2)

    /**
     * 统计每个道路的车流量,每隔5秒统计一次,统计近10秒的数据
     *
     */
    val roadKvDS: DataStream[(String, Int)]  = assDS.map(kv => (kv._1, 1))

    //按照道路分组
    val keyByDS: KeyedStream[(String, Int), String] = roadKvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

当出现,时间为2022.01.01 10:10:10先到达,2022.01.01 10:10:04后到达时,04 这条数据就会丢失,报错情况如下。可以采用水位线的方法解决。但是缺点是会造成一定的延时

5. 水位线

package com.wt.flink.core
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import java.time.Duration
object Demo5ShuiWei {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //读取卡口过车数据
    val dataDS: DataStream[String] = env.socketTextStream("master", 8888)

    //整理数据取出道路编号和时间戳
    val kcDS: DataStream[(String, Long)] = dataDS.map(line => {
      val split: Array[String] = line.split(",")
      //道路编号
      val roadId: String = split(1)
      //时间戳
      val ts: Long = split(2).toLong
      (roadId, ts)
    })

    /**
     * 要使用事件时间需要告诉flink程序哪一个字段是事件时间
     * 时间字段必须是毫秒级别
     *
     */
    //默认水位线等于最新一条数据的时间戳,水位线只能增加不能减少
    //val assDS: DataStream[(String, Long)] = kcDS.assignAscendingTimestamps(kv => kv._2)

    val assDS: DataStream[(String, Long)] = kcDS.assignTimestampsAndWatermarks(
      WatermarkStrategy
        //设置水位线的生成策略,前移5秒
        .forBoundedOutOfOrderness(Duration.ofSeconds(5))
        //设置时间字段
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
            //时间字段
            element._2
          }
        })
    )

    /**
     * 统计每个道路的车流量 每隔5秒统计一次 统计最近5秒的车辆
     */
    val roadKvDS: DataStream[(String, Int)] = assDS.map(kv => (kv._1, 1))

    //按照道路分组
    val keyByDS: KeyedStream[(String, Int), String] = roadKvDS.keyBy(_._1)

    val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS
      //滑动的处理时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))

    val countDS: DataStream[(String, Int)] = windowDS.sum(1)

    countDS.print()

    env.execute()
  }
}

标签:DataStream,Flink,String,val,flink,时间,time,apache,import
来源: https://www.cnblogs.com/atao-BigData/p/16515787.html