Flink统计当日的PV
作者:互联网
大数据,WordCount 会是第一个执行的demo,UV、PV 是稍微高级一点点的demo 了
今天就来用FLink 做个统计PV的demo(有了PV , UV 也类似了,多了一步去重的操作)。
测试环境:
flink 1.7.2
1、数据流程
a.模拟数据生成,发送到kafka
b.flink 读取数据,count
c. 输出数据到kafka(为了方便查看,输出了一份到控制台)
2、模拟数据生成器
数据格式如下 : {"id" : 1, "createTime" : "2019-05-24 10:36:43.707"}
id 为数据生成的序号(累加),时间为数据时间(默认为数据生成时间)
模拟数据生成器代码如下:
package com.venn.currentDayPv import java.text.SimpleDateFormat import java.util.{Calendar, Date} import com.venn.common.Common import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import scala.util.parsing.json.JSONObject /** * test data maker */ object CurrentDayMaker { var minute : Int = 1 val calendar: Calendar = Calendar.getInstance() /** * 一天时间比较长,不方便观察,将时间改为当前时间, * 每次累加10分钟,这样一天只需要144次循环,也就是144秒 * @return */ def getCreateTime(): String = { // minute = minute + 1 calendar.add(Calendar.MINUTE, 10) sdf.format(calendar.getTime) } val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") def main(args: Array[String]): Unit = { val producer = new KafkaProducer[String, String](Common.getProp) calendar.setTime(new Date()) println(sdf.format(calendar.getTime)) var i =0; while (true) { // val map = Map("id"-> i, "createTime"-> sdf.format(System.currentTimeMillis())) val map = Map("id"-> i, "createTime"-> getCreateTime()) val jsonObject: JSONObject = new JSONObject(map) println(jsonObject.toString()) // topic current_day val msg = new ProducerRecord[String, String]("current_day", jsonObject.toString()) producer.send(msg) producer.flush()
// Thread.sleep(1000) i = i + 1 } } }
生成数据如下:
{"id" : 0, "createTime" : "2019-05-24 18:02:26.292"} {"id" : 1, "createTime" : "2019-05-24 18:12:26.292"} {"id" : 2, "createTime" : "2019-05-24 18:22:26.292"} {"id" : 3, "createTime" : "2019-05-24 18:32:26.292"} {"id" : 4, "createTime" : "2019-05-24 18:42:26.292"}
3、flink 程序
package com.venn.currentDayPv import java.io.File import java.text.SimpleDateFormat import com.venn.common.Common import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.formats.json.JsonNodeDeserializationSchema import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} /** * Created by venn on 19-5-23. * * use TumblingEventTimeWindows count current day pv * for test, update day window to minute window * * .windowAll(TumblingEventTimeWindows.of(Time.minutes(1), Time.seconds(0))) * TumblingEventTimeWindows can ensure count o minute event, * and time start at 0 second (like : 00:00:00 to 00:00:59) * */ object CurrentDayPvCount { def main(args: Array[String]): Unit = { // environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val topic = "current_day" val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") val kafkaSource = new FlinkKafkaConsumer[ObjectNode](topic, new JsonNodeDeserializationSchema(), Common.getProp) val sink = new FlinkKafkaProducer[String](topic + "_out", new SimpleStringSchema(), Common.getProp) sink.setWriteTimestampToKafka(true) val stream = env.addSource(kafkaSource) .map(node => { Event(node.get("id").asText(), node.get("createTime").asText()) }) .assignAscendingTimestamps(event => sdf.parse(event.createTime).getTime) // window is one minute, start at 0 second //.windowAll(TumblingEventTimeWindows.of(Time.minutes(1), Time.seconds(0))) // window is one hour, start at 0 second // .windowAll(TumblingEventTimeWindows.of(Time.hours(1), Time.seconds(0))) // window is one day, start at 0 second, todo there have a bug(FLINK-11326), can't use negative number, 1.8 修复 .windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // every process one minute, export current value, // .trigger(ContinuousEventTimeTrigger.of(Time.seconds(60))) // every process one minute, export current value, // .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(60))) // every event, export current value, .trigger(CountTrigger.of(1)) .reduce(new ReduceFunction[Event] { override def reduce(event1: Event, event2: Event): Event = { // 将结果中,id的最小值和最大值输出 new Event(event1.id , event2.id , event1.count + event2.count) } }) stream.print("result : ") // execute job env.execute("CurrentDayCount") } } case class Event(id: String, createTime: String, count: Int = 1) {}
4、运行结果
测试数据如下:
{"id" : 0, "createTime" : "2019-05-24 20:29:49.102"}
{"id" : 1, "createTime" : "2019-05-24 20:39:49.102"}
...
{"id" : 20, "createTime" : "2019-05-24 23:49:49.102"}
{"id" : 21, "createTime" : "2019-05-24 23:59:49.102"}
{"id" : 22, "createTime" : "2019-05-25 00:09:49.102"}
{"id" : 23, "createTime" : "2019-05-25 00:19:49.102"}
...
{"id" : 163, "createTime" : "2019-05-25 23:39:49.102"}
{"id" : 164, "createTime" : "2019-05-25 23:49:49.102"}
{"id" : 165, "createTime" : "2019-05-25 23:59:49.102"}
{"id" : 166, "createTime" : "2019-05-26 00:09:49.102"}
...
{"id" : 308, "createTime" : "2019-05-26 23:49:49.102"}
{"id" : 309, "createTime" : "2019-05-26 23:59:49.102"}
{"id" : 310, "createTime" : "2019-05-27 00:09:49.102"}
0 - 21 是 24号
22 - 165 是 25 号
166 - 309 是 26 号
输出结果(程序中reduce 方法,将窗口中第一条和最后一条数据的id,都放到 Event中 )如下:
与测试数据对应
5、说明
很多人会错误的以为,窗口时间的开始时间会是程序启动(初始化)的时间。事实上,窗口(以翻滚窗口为例)的定义有两个重载的方法:包含两个参数,窗口的长度和窗口的offset(默认为0)
源码:org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows :
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.windowing.assigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.table.expressions.Abs; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; /** * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the * elements. Windows cannot overlap. * * <p>For example, in order to window into windows of 1 minute: * <pre> {@code * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1))); * } </pre> */ @PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; protected TumblingEventTimeWindows(long size, long offset) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); System.out.println("start : " + start + ", end : " + (start+size)); String startStr =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start); String endStar =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start + size); System.out.println("window start: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start)); System.out.println("window end: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start + size)); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "TumblingEventTimeWindows(" + size + ")"; } /** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp. * * @param size The size of the generated windows. * @return The time policy. */ public static TumblingEventTimeWindows of(Time size) { return new TumblingEventTimeWindows(size.toMilliseconds(), 0); } /** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp and offset. * * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get * time windows start at 0:15:00,1:15:00,2:15:00,etc. * * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, * such as China which is using UTC+08:00,and you want a time window with size of one day, * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. */ public static TumblingEventTimeWindows of(Time size, Time offset) { return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } }
每条数据都会触发: assignWindows 方法
计算函数如下:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
offset 为0时,当前时间戳为 1558886400000时,时间长度为 1天:
1558886400000 - (1558886400000 - 8 + 86400000)% 86400000 = 1558828800008
dubug 如下:
6、特别说明
FLink 1.6.3/1.7.1/1.7.2 在 TumblingEventTimeWindows 构造器上有个bug:offset 不能小于0, 但是of 方法中又说明,可以使用: of(Time.days(1),Time.hours(-8)) 表示在中国的 0 点开始的一天窗口。 JIRA : FLINK-11326 ,jira 上注明1.8.0 修复。(我本来准备提个bug的,有人先下手了)
这个bug 可以通过自己创建一个相同包的相同类,将对应代码修改即可。
flink 1.7.2 源码:
protected TumblingEventTimeWindows(long size, long offset) { if (offset < 0 || offset >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size"); } this.size = size; this.offset = offset; }
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
*
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
最新版源码:
protected TumblingEventTimeWindows(long size, long offset) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; }
修改:
标签:Time,PV,Flink,00,new,offset,import,当日,size 来源: https://www.cnblogs.com/Springmoon-venn/p/10919648.html