道路卡口摄像头的异常状态统计
作者:互联网
package camera
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
// 摄像异常状态的功能代码
object CameraAbnormality {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("camera").setMaster("local[*]")
val session = SparkSession.builder().config(conf).getOrCreate()
val map = Map[String, String]("mode" -> "FAILFAST", "inferSchema" -> "true")
val frame1 = session.read.options(map).csv("hdfs://HadoopCluster/cameras.csv")
// 卡口摄像头数据
// 卡口id: 区域id-街道id-卡口编号
// 摄像头id: 1-8
val cameras = frame1.toDF("monitorId", "cameraId")
val frame2 = session.read.options(map).csv("hdfs://HadoopCluster/project/20220913/*")
// 车流量信息
// 车辆通过时间 卡口编号 摄像头编号 通过车辆车牌 某个摄像头拍摄时间 单位:秒 通过卡口的速度 道路id 区域id
// 2022-09-13, 5-23-63, 3, 晋KIFWNQ, 2022-09-13 15:47:47, 17, 23, 5
// todayDate monitorId camera_id chepai actionTime speed roadId areaId
val flow = frame2.toDF("todayDate", "monitorId", "cameraId", "chepai", "actionTime", "speed", "roadId", "areaId")
cameras.createTempView("cameras")
flow.createTempView("flow")
// cameras.show(10)
// flow.show(10)
val frame = session.sql(
"select a.monitorId, a.cameraId " +
"from cameras as a " +
"left join flow as b " +
"on a.monitorId = b.monitorId and a.cameraId = b.cameraId " +
"where b.chepai is null " +
"order by a.monitorId")
frame.show(200)
session.stop()
}
}
标签:monitorId,卡口,val,cameras,session,道路,id,摄像头 来源: https://www.cnblogs.com/jsqup/p/16695635.html