FLink18--全窗口聚合方式2 ProcessWindowApp
作者:互联网
一、依赖
二、代码
package net.xdclass.class11;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
/**
* 推荐本方法
* 全量聚合方法2 process(new ProcessWindowFunction(){})
* !!!WindowFunction后面可能废弃,用processWindowFunction更好,有打开关闭功能
* 全窗口函数,自定义窗口计算,适用于复杂场景
* @desc 窗口计算,全窗口函数,可以拿到整个窗口的数据做计算
* @menu
*/
public class FLink18ProcessWindowApp {
public static void main(String[] args) throws Exception{
//WebUi方式运行
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置运行模式为流批一体
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//并行度
env.setParallelism(1);
//设置为自定义source
// DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> keyByDs = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder videoOrder) throws Exception {
return videoOrder.getTitle();
}
});
//全窗口函数,可以拿到整个窗口的数据做计算
SingleOutputStreamOperator<VideoOrder> sumAllWindowDs = keyByDs
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<VideoOrder> iterable,
Collector<VideoOrder> output) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(iterable.iterator());
if (list.size() <= 0) {
return;
}
int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
//新建一个返回结果对象,数据聚合后发送出去
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setTitle(list.get(0).getTitle());
videoOrder.setCreateTime(list.get(0).getCreateTime());
//获取窗口开始结束时间,还可以获取很多信息
System.out.println("窗口开始时间"+context.window().getStart()+"窗口结束时间"+context.window().getEnd());
output.collect(videoOrder);
}
});
sumAllWindowDs.print();
//DataStream需要调用execute,可以取个名称
env.execute("Sailing Window job");
}
}
标签:ProcessWindowApp,--,flink,FLink18,streaming,api,org,apache,import 来源: https://www.cnblogs.com/robots2/p/16064375.html