其他分享
首页 > 其他分享> > 增量聚合和全窗口函数的结合使用

增量聚合和全窗口函数的结合使用

作者:互联网

增量聚合和全窗口函数的对比

已经了解了Window API中两类窗口函数的用法,下面先来做个简单的总结。增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

 // ReduceFunction与WindowFunction结合
 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) 
 // ReduceFunction与ProcessWindowFunction结合
 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T>  reduceFunction,  ProcessWindowFunction<T,  R,  K,  W> function)
 // AggregateFunction与WindowFunction结合
 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,  ACC,  V>  aggFunction,  WindowFunction<V,  R,  K,  W> windowFunction)
 // AggregateFunction与ProcessWindowFunction结合
 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction,ProcessWindowFunction<V, R, K, W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。下面举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的url,前提是得到每个链接的“热门度”。一般情况下,可以用url的浏览量(点击量)表示热门度。我们这里统计10秒钟的url浏览量,每5秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

准备包装类

public class UrlViewCount {
    public String url;
    public Long cnt;
    public Long start;
    public Long end;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long cnt, Long start, Long end) {
        this.url = url;
        this.cnt = cnt;
        this.start = start;
        this.end = end;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", cnt=" + cnt +
                ", TimestampStart=" + new Timestamp(start) +
                ", TimestampEnd=" + new Timestamp(end) +
                '}';
    }
}

实现逻辑

/**
 * 使用增量聚合函数计算 PV
 * 使用全窗口函数包装数据
 */
public class UVCountExample0627 {
    public static void main(String[] args) throws Exception {
        //获取执行环境&设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //获取数据&提取时间戳
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
        );
        //按 url key
        eventSingleOutputStreamOperator.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //AggregateFunction 进行增量聚聚合
                .aggregate(new AggregateFunction<Event, Long, Long>() {
                               //增量聚合逻辑
                               @Override
                               public Long createAccumulator() {
                                   //初始值
                                   return 0L;
                               }

                               @Override
                               public Long add(Event value, Long accumulator) {
                                   //累加规则
                                   return accumulator + 1L;
                               }

                               @Override
                               public Long getResult(Long accumulator) {
                                   //获取结果
                                   return accumulator;
                               }

                               @Override
                               public Long merge(Long a, Long b) {
                                   //合并
                                   return a + b;
                               }
                           }, //使用全窗口函数包装数据
                        new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>() {
                            /**
                             * new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>
                             *     Long 输入数据类型
                             *     UrlViewCount 输出黄数据类型
                             *     String key 类型:data -> data.url ;url String
                             *     TimeWindow 需要一个这样的变量
                             */

                            @Override
                            public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
                                /**
                                 * String url key
                                 * Context context 上下文
                                 *  Iterable<Long> elements 数据
                                 *  Collector<UrlViewCount> out 搜集器
                                 */
                                long end = context.window().getEnd();
                                long start = context.window().getStart();
                                Long next = elements.iterator().next();
                                out.collect(new UrlViewCount(url, next, start, end));
                            }
                        }).print();

        //执行
        env.execute();
    }
}

代码中用一个AggregateFunction来实现增量聚合,每来一个数据就计数加一;得到的结果交给ProcessWindowFunction,结合窗口信息包装成我们想要的UrlViewCount,最终输出统计结果。注:ProcessWindowFunction是处理函数中的一种,这里只用它来将增量聚合函数的输出结果包裹一层窗口信息。窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

标签:窗口,函数,url,Long,增量,聚合,public
来源: https://www.cnblogs.com/wdh01/p/16435000.html