Flink 基石、Flink Time、事件时间、Watermark水位线
作者:互联网
Flink 基石、Flink Time、事件时间、Watermark水位线
目录Flink 基石
Flink Time
事件时间
代码示例
package com.shujia.flink.core
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo5EventTIme {
def main(args: Array[String]): Unit = {
/*
用户id,事件时间
001,1647676561000
001,1647676562000
001,1647676563000
001,1647676565000
001,1647676564000
001,1647676566000
001,1647676567000
001,1647676568000
001,1647676569000
001,1647676570000
001,1647676575000
*/
/**
* 使用事件时间划分窗口
* 1、设置事件模式为事件时间
* 2、指定时间字段
*/
/**
* 每隔5秒统计用户出现的次数
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//这里需要将并行度设置为1
//因为这里存在一个时间戳对齐的问题,多并行度的时候会对不齐
//不会触发事件时间的计算
env.setParallelism(1)
//设置时间模式
//默认是处理时间
//TimeCharacteristic.EventTime -- 事件时间
//TimeCharacteristic.IngestionTime -- 接收时间
//TimeCharacteristic.ProcessingTime -- 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
val split: Array[String] = line.split(",")
(split(0), split(1).toLong)
})
//设置时间字段
val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)
/**
* 事件时间窗口触发条件
* 1、窗口内有数据
* 2、最新数据的事件时间大于等于窗口的结束数据的时间
* 但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?
*/
val countDS: DataStream[(String, Int)] = assDS
.map(kv => (kv._1, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.sum(1)
countDS.print()
env.execute()
}
}
但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?
窗口如果被计算了,之后再来一条属于这个窗口的数据会丢数据
Watermark
水位线
package com.shujia.flink.core
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo5EventTIme {
def main(args: Array[String]): Unit = {
/*
001,1647676561000
001,1647676562000
001,1647676563000
001,1647676565000
001,1647676564000
001,1647676566000
001,1647676567000
001,1647676568000
001,1647676569000
001,1647676570000
001,1647676575000
*/
/**
* 使用事件事件划分窗口
* 1、设置事件模式为事件时间
* 2、指定时间字段
*/
/**
* 每隔5秒统计用户出现的次数
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置时间模式
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
val split: Array[String] = line.split(",")
(split(0), split(1).toLong)
})
//设置时间字段,水位线默认等于最新数据的时间戳,水位线只增加不减少
// val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)
//设置水位线和时间字段
val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
//执行水位线前移的时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
//指定时间戳字段
override def extractTimestamp(element: (String, Long)): Long = element._2
}
)
/**
* 事件时间窗口触发条件
* 1、窗口内有数据
* 2、最新数据的时间大于等于窗口的结束数据
*
*/
val countDS: DataStream[(String, Int)] = assDS
.map(kv => (kv._1, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.sum(1)
countDS.print()
env.execute()
}
}
学习一个新框架,会看官网很重要
标签:String,val,Watermark,Flink,001,时间,Time 来源: https://www.cnblogs.com/saowei/p/16032325.html