增量聚合和全窗口函数的结合使用
作者:互联网
增量聚合和全窗口函数的对比
已经了解了Window API中两类窗口函数的用法,下面先来做个简单的总结。增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) // ReduceFunction与ProcessWindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) // AggregateFunction与WindowFunction结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction) // AggregateFunction与ProcessWindowFunction结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction,ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。下面举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的url,前提是得到每个链接的“热门度”。一般情况下,可以用url的浏览量(点击量)表示热门度。我们这里统计10秒钟的url浏览量,每5秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。
准备包装类
public class UrlViewCount { public String url; public Long cnt; public Long start; public Long end; public UrlViewCount() { } public UrlViewCount(String url, Long cnt, Long start, Long end) { this.url = url; this.cnt = cnt; this.start = start; this.end = end; } @Override public String toString() { return "UrlViewCount{" + "url='" + url + '\'' + ", cnt=" + cnt + ", TimestampStart=" + new Timestamp(start) + ", TimestampEnd=" + new Timestamp(end) + '}'; } }
实现逻辑
/** * 使用增量聚合函数计算 PV * 使用全窗口函数包装数据 */ public class UVCountExample0627 { public static void main(String[] args) throws Exception { //获取执行环境&设置并行度 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //获取数据&提取时间戳 SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = env.addSource(new ClickSource()).assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); //按 url key eventSingleOutputStreamOperator.keyBy(data -> data.url) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) //AggregateFunction 进行增量聚聚合 .aggregate(new AggregateFunction<Event, Long, Long>() { //增量聚合逻辑 @Override public Long createAccumulator() { //初始值 return 0L; } @Override public Long add(Event value, Long accumulator) { //累加规则 return accumulator + 1L; } @Override public Long getResult(Long accumulator) { //获取结果 return accumulator; } @Override public Long merge(Long a, Long b) { //合并 return a + b; } }, //使用全窗口函数包装数据 new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>() { /** * new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> * Long 输入数据类型 * UrlViewCount 输出黄数据类型 * String key 类型:data -> data.url ;url String * TimeWindow 需要一个这样的变量 */ @Override public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception { /** * String url key * Context context 上下文 * Iterable<Long> elements 数据 * Collector<UrlViewCount> out 搜集器 */ long end = context.window().getEnd(); long start = context.window().getStart(); Long next = elements.iterator().next(); out.collect(new UrlViewCount(url, next, start, end)); } }).print(); //执行 env.execute(); } }
代码中用一个AggregateFunction来实现增量聚合,每来一个数据就计数加一;得到的结果交给ProcessWindowFunction,结合窗口信息包装成我们想要的UrlViewCount,最终输出统计结果。注:ProcessWindowFunction是处理函数中的一种,这里只用它来将增量聚合函数的输出结果包裹一层窗口信息。窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
标签:窗口,函数,url,Long,增量,聚合,public 来源: https://www.cnblogs.com/wdh01/p/16435000.html