spark streaming 小案例
作者:互联网
spark streaming
实时计算的案例
数据
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117985031601010,"camera_id":"01214","orientation":"西南","road_id":34052057,"time":1614711914,"speed":45.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117984031601010,"camera_id":"01024","orientation":"西北","road_id":34052058,"time":1614711924,"speed":45.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117970031606010,"camera_id":"01022","orientation":"西北","road_id":34052059,"time":1614712022,"speed":75.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117956031625010,"camera_id":"01132","orientation":"西北","road_id":34052060,"time":1614712120,"speed":46.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117925031638010,"camera_id":"00202","orientation":"西北","road_id":34052061,"time":1614712218,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117902031651010,"camera_id":"01102","orientation":"西北","road_id":34052062,"time":1614712316,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117885031666010,"camera_id":"01221","orientation":"西北","road_id":34308114,"time":1614712414,"speed":48.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117855031704010,"camera_id":"00231","orientation":"西北","road_id":34308115,"time":1614712619,"speed":59.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117817031742010,"camera_id":"01130","orientation":"西北","road_id":34308116,"time":1614712824,"speed":52.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117784031777010,"camera_id":"00123","orientation":"西北","road_id":34308117,"time":1614713030,"speed":71.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117720031793010,"camera_id":"00132","orientation":"西北","road_id":34308118,"time":1614713235,"speed":65.5}
...
...
...
注意点:
* 将数据保存到数据库存在的问题
* 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
* 2、如果将网络链接放在foreach算子的外面,会报错, 网络链接不能再网络中传输
*
* 正确写法
* 使用foreachPartition,只会为每一个分区创建一个数据库链接
*
* rdd的foreach和foreachPartition
* foreach一次处理一条数据
* foreachPartition: 一次处理一个分区的数据
处理方法:
package com.shujia.spark.streaming
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date
object Demo8Card {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("ds")
.master("local[2]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
/**
* 读取卡口过车数据
*/
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
/**
* 1、解析json格式的数据
*
*/
val cardAndSpeedDS: DStream[(Long, (Double, Int))] = linesDS.map(line => {
//使用fastjson工具解析json数据
val carJson: JSONObject = JSON.parseObject(line)
//取出卡口编号和车速
val card: Long = carJson.getLong("card")
val speed: Double = carJson.getDouble("speed")
(card, (speed, 1))
})
/**
* 2、实时统计每隔卡口的平均车速,和车的数量
* 统计最近15秒的车辆,每隔5秒统计一次
*
*/
val sumSpeedAndNUmDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS
.reduceByKeyAndWindow((kv1: (Double, Int), kv2: (Double, Int)) => {
//计算总的测试
val sumSpeed: Double = kv1._1 + kv2._1
//计算车的数量
val num: Int = kv1._2 + kv2._2
(sumSpeed, num)
}, Durations.seconds(15), Durations.seconds(5))
/**
* 3、计算平均车速
*
*/
val avgSpeedAndNumDs: DStream[(Long, Int, Double)] = sumSpeedAndNUmDS.map {
case (card: Long, (sumSpeed: Double, num: Int)) =>
val avgSpeed: Double = sumSpeed / num
(card, num, avgSpeed)
}
/**
* 4、将统计的结果保存到mysql中
*
* 将数据保存到数据库存在的问题
* 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
* 2、如果将网络链接放在foreach算子的外面,会报错, 网络链接不能再网络中传输
*
* 正确写法
* 使用foreachPartition,只会为每一个分区创建一个数据库链接
*
* rdd的foreach和foreachPartition
* foreach一次处理一条数据
* foreachPartition: 一次处理一个分区的数据
*
*/
avgSpeedAndNumDs.foreachRDD(rdd => {
rdd.foreachPartition(iter => {
//获取统计的时间
val date = new Date()
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val comDate: String = format.format(date)
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2、创建链接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
//3、编写插入数据的sql
val stat: PreparedStatement = con.prepareStatement("insert into card_avg_speed_and_num(card,com_date,num,avg_speed) values(?,?,?,?)")
//这里的foreach是迭代器的一个普通方法,不是一个算子
iter.foreach {
case (card: Long, num: Int, avgSpeed: Double) =>
//设置参数
stat.setLong(1, card)
stat.setString(2, comDate)
stat.setInt(3, num)
stat.setDouble(4, avgSpeed)
//插入数据
stat.execute()
}
stat.close()
con.close()
})
})
avgSpeedAndNumDs.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
标签:code,val,案例,county,streaming,spark,speed,id,card 来源: https://www.cnblogs.com/atao-BigData/p/16496896.html