DataStreamApi 之watermark和开窗设置
作者:互联网
watermark 设置的方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Snapshot> dataStreamSource = env.addSource(new SourceFormClickhouse2());
// 设置watermark
WatermarkStrategy<Snapshot> snapshotWatermarkStrategy =
WatermarkStrategy.<Snapshot>forBoundedOutOfOrderness(Duration.ofSeconds(10)).
withTimestampAssigner((s, timestamp) -> DateUtil.dateToStamp(s.getDateTime()));
// 引入watermark
SingleOutputStreamOperator<Snapshot> streamOperator = dataStreamSource.assignTimestampsAndWatermarks(snapshotWatermarkStrategy);
开窗 window (key 分组)
多个字段想分组,如何设置。
之前版本 有 keyBy(“field1,field2,field3”) 这种方法已经废弃。
1、方式一
使用元组来实现多个字段keyBy
streamOperator.keyBy(snapshot -> Tuple2.of(snapshot.getHsSecurityId(), snapshot.getSecurityId()))
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.max("highPx")
.print();
2、方式二
这种写法性能差些,也可以实现。
streamOperator.keyBy(snapshot -> snapshot.getSecurityId())
.keyBy(s->s.getHsSecurityId())
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.max("highPx")
.print();
3、方式三
使用对象构造
streamOperator.keyBy(snapshot -> new FieldSorting(snapshot.getSecurityId(),snapshot.getSecurityId()))
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.max("highPx")
.print();
标签:getSecurityId,watermark,keyBy,window,snapshot,开窗,streamOperator,DataStreamApi 来源: https://blog.csdn.net/weixin_43975771/article/details/120853178