其他分享
首页 > 其他分享> > FLink18--全窗口聚合方式2 ProcessWindowApp

FLink18--全窗口聚合方式2 ProcessWindowApp

作者:互联网

一、依赖

 

二、代码

package net.xdclass.class11;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;

/**
 *    推荐本方法
 *    全量聚合方法2   process(new ProcessWindowFunction(){})
 *    !!!WindowFunction后面可能废弃,用processWindowFunction更好,有打开关闭功能
 *    全窗口函数,自定义窗口计算,适用于复杂场景
 * @desc 窗口计算,全窗口函数,可以拿到整个窗口的数据做计算
 * @menu
 */
public class FLink18ProcessWindowApp {

    public static void main(String[] args) throws Exception{
        //WebUi方式运行
//        final StreamExecutionEnvironment env =
//                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置运行模式为流批一体
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //并行度
        env.setParallelism(1);
        //设置为自定义source
//        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());


        KeyedStream<VideoOrder, String> keyByDs = ds.keyBy(new KeySelector<VideoOrder, String>() {
            @Override
            public String getKey(VideoOrder videoOrder) throws Exception {
                return videoOrder.getTitle();
            }
        });
        //全窗口函数,可以拿到整个窗口的数据做计算
        SingleOutputStreamOperator<VideoOrder> sumAllWindowDs = keyByDs
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<VideoOrder> iterable,
                            Collector<VideoOrder> output) throws Exception {
                        List<VideoOrder> list = IteratorUtils.toList(iterable.iterator());
                        if (list.size() <= 0) {
                            return;
                        }
                        int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
                        //新建一个返回结果对象,数据聚合后发送出去
                        VideoOrder videoOrder = new VideoOrder();
                        videoOrder.setMoney(total);
                        videoOrder.setTitle(list.get(0).getTitle());
                        videoOrder.setCreateTime(list.get(0).getCreateTime());
                        //获取窗口开始结束时间,还可以获取很多信息
                        System.out.println("窗口开始时间"+context.window().getStart()+"窗口结束时间"+context.window().getEnd());
                        output.collect(videoOrder);
                    }
                });
        sumAllWindowDs.print();

        //DataStream需要调用execute,可以取个名称
        env.execute("Sailing Window job");
    }
}

 

标签:ProcessWindowApp,--,flink,FLink18,streaming,api,org,apache,import
来源: https://www.cnblogs.com/robots2/p/16064375.html