Flink整合kafka的练习-统计卡口通过的车辆数量(及提交到集群中的问题)
作者:互联网
练习-统计卡口通过的车辆数量(及提交到集群中的问题)
统计卡口查流量
1、统计最近15分分钟的车流量
2、每隔5分钟统计一次
3、使用事件时间
4、数据源使用kafka
5、将统计好的结果保存到mysql中
1、创建kafka生产者生产几条数据
#创建一个Topic
kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181 --replication-factor 1 --partitions 1 --topic cars
#生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic cars
数据
{"car":"皖A9A7N2","city_code":"340100","county_code":"340122","card":116696031866010,"camera_id":"01123","orientation":"西","road_id":34044594,"time":1614718101,"speed":86.24}
{"car":"皖AR3HY0","city_code":"340100","county_code":"340102","card":117350032007010,"camera_id":"00002","orientation":"南","road_id":34044594,"time":1614718202,"speed":72.23}
{"car":"皖AR3HY0","city_code":"340100","county_code":"340102","card":117350032007010,"camera_id":"00002","orientation":"南","road_id":34044594,"time":1614718303,"speed":72.23}
{"car":"皖AR3HY0","city_code":"340100","county_code":"340102","card":117350032007010,"camera_id":"00002","orientation":"南","road_id":34044594,"time":1614718604,"speed":72.23}
{"car":"皖AR3HY0","city_code":"340100","county_code":"340102","card":117350032007010,"camera_id":"00002","orientation":"南","road_id":34044594,"time":1614718905,"speed":72.23}
代码如下:
package com.wt.flink.core
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.time.Duration
object Demo6Cars {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
/**
* 1、从kafka中读取卡口过车数据
*/
val source: KafkaSource[String] = KafkaSource
.builder[String]
.setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表
.setTopics("cars") //指定topic
.setGroupId("asdasdasd") //指定消费者组,一条数据在一个组内只被消费一次
.setStartingOffsets(OffsetsInitializer.latest()) //读取数据的位置,earliest:读取所有的数据,latest:读取最新的数据
.setValueOnlyDeserializer(new SimpleStringSchema()) //反序列的类
.build
//使用kafka source
val carsDS: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
/**
* 解析json格式数据,将卡口编号和时间字段取出来
* fastJson
*
*/
val cardAndTimeDS: DataStream[(Long, Long)] = carsDS.map(line => {
//将字符串转换成json对象
val jsonObj: JSONObject = JSON.parseObject(line)
//使用字段名获取字段值
//卡口编号
val card: Long = jsonObj.getLong("card")
//事件时间,事件时间要求时毫秒级别
val time: Long = jsonObj.getLong("time") * 1000
(card, time)
})
/**
*
* 设置时间字段和水位线
*
*/
val assDS: DataStream[(Long, Long)] = cardAndTimeDS.assignTimestampsAndWatermarks(
WatermarkStrategy
//设置水位线的生成策略,前移5秒
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//设置时间字段
.withTimestampAssigner(new SerializableTimestampAssigner[(Long, Long)] {
override def extractTimestamp(element: (Long, Long), recordTimestamp: Long): Long = {
//时间字段
element._2
}
})
)
/**
* 统计车流量
*
*/
val kvDS: DataStream[(Long, Int)] = assDS.map(kv => (kv._1, 1))
//按照卡口分组
val keyBYDS: KeyedStream[(Long, Int), Long] = kvDS.keyBy(_._1)
//开窗口
val windowDS: WindowedStream[(Long, Int), Long, TimeWindow] = keyBYDS
.window(SlidingEventTimeWindows.of(Time.minutes(15), Time.minutes(4)))
//统计车流量
val flowDS: DataStream[(Long, Int)] = windowDS.sum(1)
/**
* 将统计的结果保存到mysql中
*
*/
flowDS.addSink(new RichSinkFunction[(Long, Int)] {
//插入数据
override def invoke(value: (Long, Int), context: SinkFunction.Context): Unit = {
println("数据写入mysql")
stat.setLong(1, value._1)
stat.setInt(2, value._2)
stat.execute()
}
var con: Connection = _
var stat: PreparedStatement = _
//创建链接
override def open(parameters: Configuration): Unit = {
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//创建链接
con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata17", "root", "123456")
//编写插入数据的sql
//replace :如果不存在插入,如果存在就替换,需要在表中设置主键
stat = con.prepareStatement("replace into card_flow(card,flow) values(?,?)")
}
//关闭链接
override def close(): Unit = {
stat.close()
con.close()
}
})
env.execute()
}
}
结果如下:
将代码提交到集群中运行
1、启动yarn-session.
yarn-session.sh -d
2、提交任务到session集群
# -Dyarn.application.id 每一次重启yarn-session都会变
flink run -t yarn-session -Dyarn.application.id=application_1658841626681_0003 -c com.wt.flink.core.Demo6Cars flink-1.0-SNAPSHOT.jar
错误
错误的原因:在集群中找到kafka的依赖
# 缺少kafka的依赖
java.lang.ClassNotFoundException: org.apache.flink.connector.kafka.source.KafkaSource
# 缺少mysql驱动
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
# 缺少fastjson依赖
java.lang.ClassNotFoundException: com.alibaba.fastjson.JSON
错误-1
错误差不多,就不一一展示了
解决的方法
将kafka的依赖上传到flink的lib目录下
将mysql的依赖上传到flink的lib目录下
# flink 链接kafka依赖
flink-sql-connector-kafka-1.15.0.jar
# mysql 驱动
mysql-connector-java-5.1.49.jar
如下:
需要重启yarn-session
# 关闭yarn-session.sh
yarn application -kill application_1658546198162_0001
# 启动yarn session
yarn-session.sh -d
此时查看结果:
标签:卡口,flink,val,Flink,Long,kafka,import,id 来源: https://www.cnblogs.com/atao-BigData/p/16523229.html