多张报表简单逻辑在同一任务中样例
作者:互联网
package com.fengtu.sparktest.eta import java.text.SimpleDateFormat import java.util import java.util.Date import com.alibaba.fastjson.JSONObject import com.fengtu.sparktest.utils.{JSONUtils, MD5Util, SparkUtils} import com.fengtu.sparktest.utils2.DateUtil import org.apache.commons.lang import org.apache.commons.lang.StringUtils import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel /* * 导航SDK-ETA指标监控需求(陈晨) */ object NaviSdkEtaIndexMonitor extends Serializable { val appName: String = this.getClass.getSimpleName.replace("$", "") val logger: Logger = Logger.getLogger(appName) val funcMap = new util.HashMap[String,String]() val descMysqlUserName = "gis_oms_pns" val descMysqlPassWord = "gis_oms_pns@123@" val descMysqlUrl = "jdbc:mysql://10.119.72.209:3306/gis_oms_lip_pns?characterEncoding=utf-8" /*p1*/ //点击率 点选率 val clickRateStatisticsSourTable = "dm_gis.gis_navi_top3_click_route" //测试注释 val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS" //val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS_test" //偏航 val yawStatisticsSourTable = "dm_gis.gis_navi_result_union" //测试注释 val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS" //val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS_test" //准确率 val accStatisticsSourTable = "gis_navi_eta_result_tmp" val accStatisticsDescTable = "ETA_INDEX_ACCARY_RATE_STATISTICS" //一致率 val consistentSourTable = "gis_navi_eta_result_tmp" val reqAccStatisticsDescTable = "ETA_INDEX_CONSISTENT_REQACC_STATISTICS" //val reqAccStatisticsDescTable = "tmp_ETA_INDEX_CONSISTENT_REQACC_STATISTICS" val accConStatisticsDescTable = "ETA_INDEX_CONSISTENT_ACC_STATISTICS" //使用率 val useRateSourTable = "dm_gis.gis_navi_result_union" val useRateDestTable = "ETA_INDEX_USE_RATE_STATISTICS" //异常退出监控 val aemStatisticsSourTable = "dm_gis.gis_navi_result_union" val aemStatisticsoDescTable = "ETA_INDEX_ABNORMAL_EXIT_MONITOR_STATISTICS" //时间偏差率 val timeDeviationRateSourTable = "gis_navi_eta_result_tmp" val timeDeviationRateDescTable = "ETA_INDEX_TIME_DIFF_TIME_STATISTICS" //特定时段偏差率 val timePeriodDeviationRateSourTable = "gis_navi_eta_result_tmp" val timePeriodDeviationRateDescTable = "ETA_INDEX_TIME_PERIOD_DIFF_TIME_STATISTICS" /*p2*/ //任务量 val taskAmountStatSourTable = "gis_navi_eta_result_tmp" val taskAmountStatDescTable = "ETA_INDEX_TASK_AMOUNT_STATISTICS" //服务指标-响应时间 var serviceCostTimeRdd = null:RDD[((String, String), JSONObject)] val serviceResponseDescTable = "ETA_INDEX_SERVICE_RESPONSE_TIME_STATISTICS" //服务指标-性能统计 val servicePerformanceDescTable = "ETA_INDEX_SERVICE_PERFORMANCE_TIME_STATISTICS" //复用率 val reuseStatSourTable = "dm_gis.gis_navi_result_union" val reuseStatDestTable = "ETA_INDEX_REUSE_RATE_STATISTICS" //问卷调查正确率 var questionnaireRdd = null:RDD[((String, String), JSONObject)] val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS" //测试注释 //val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS_test" //问卷调查司机错误占比 val questionnaireErrDestTable = "ETA_INDEX_QUESTIONNAIRE_ERR_RATE_STATISTICS" def init ( incDay:String )={ val spark = SparkSession .builder() .appName("SparkDecode") .master("yarn") .enableHiveSupport() .config("hive.exec.dynamic.partition",true) .config("hive.exec.dynamic.partition.mode","nonstrict") .getOrCreate() //val spark = SparkSession.builder().config(new SparkConf().setMaster("local[10]").setAppName(appName)).getOrCreate() spark.sparkContext.setLogLevel("ERROR") //p1 funcMap.put("点击率","processClickRateStatistics") funcMap.put("偏航","processYawStatistics") funcMap.put("准确率","processAccuracyStatistics") funcMap.put("一致率","processConsistentStatistics") funcMap.put("使用率","processUseRateStatistics") funcMap.put("异常退出监控","processAbnormalExitMonitor") funcMap.put("时间偏差率","processTimeDeviationRate") funcMap.put("特定时间偏差率","processTimePeriodDeviationRate") //p2 funcMap.put("任务量", "processTaskAmountStatistics") funcMap.put("复用率", "processReuseRateStatistics") funcMap.put("问卷调查正确率", "processQuestionnaireAccRateStatistics") funcMap.put("问卷调查司机错误占比", "processQuestionnaireDriverErrPerStatistics") funcMap.put("服务指标-响应时间", "processServiceResponseStatistics") funcMap.put("服务指标-性能统计", "processServicePerformanceStatistics") //因dm_gis.gis_navi_eta_result拆分,现需要合并注册成临时表 logger.error("合并导航结果表") val querySql = s""" |select | src_province, | src_citycode, | src_deptcode, | dest_province, | dest_citycode, | dest_deptcode, | ft_right, | tl_ft_right, | src, | duration, | plan_date, | t1.routeid as routeId, | req_order, | similarity1, | similarity5, | navi_endstatus, | t1.req_type, | diff_time, | navi_time, | t1.request_id, | t1.req_time, | t1.navi_endtime, | t1.inc_day, | req_status, | distance, | route_order, | req_order, | t1.navi_id, | t1.task_id |from ( | select * from dm_gis.gis_navi_eta_result1 where inc_day='$incDay' |) t1 |inner join ( | select * from dm_gis.gis_navi_eta_result2 where inc_day='$incDay' |) t2 |on t1.id = t2.id |""".stripMargin logger.error(querySql) spark.sql( querySql).createOrReplaceTempView("gis_navi_eta_result_tmp") spark } def main(args: Array[String]): Unit = { val spark = init( args(0) ) start(spark,args) spark.close() } /* *p1 */ //点选率映射 val clickRateSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("navi_amount", IntegerType, true), StructField("firstWay_amount", IntegerType, true), StructField("secondWay_amount", IntegerType, true), StructField("thirdWay_amount", IntegerType, true), StructField("traditionType_amount", IntegerType, true), StructField("experienceType_amount", IntegerType, true), StructField("gdType_amount", IntegerType, true) )) //偏航统计映射 val yawSchema = StructType( List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("sdkversion", StringType, true), StructField("system", StringType, true), StructField("navi_amount", IntegerType, true), StructField("yawpredict_avg", IntegerType, true), StructField("yawfinaljudgment_avg", IntegerType, true), StructField("percent99_yaw_amount", IntegerType, true), StructField("percent95_yaw_amount", IntegerType, true), StructField("percent90_yaw_amount", IntegerType, true), StructField("experienceYaw_amount", IntegerType, true), StructField("experienceUse_amount", IntegerType, true), StructField("traditionalYaw_amount", IntegerType, true), StructField("traditionalUse_amount", IntegerType, true), StructField("gdyaw_amount", IntegerType, true), StructField("gduse_amount", IntegerType, true) )) //准确率统计映射 val accSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("compensate", StringType, true), StructField("src", StringType, true), StructField("reqamount", IntegerType, true), StructField("correctamount", IntegerType, true), StructField("halfhour_reqamount", IntegerType, true), StructField("halfhour_correctamount", IntegerType, true), StructField("onehour_reqamount", IntegerType, true), StructField("onehour_correctamount", IntegerType, true), StructField("req1_amount", IntegerType, true), StructField("req2_amount", IntegerType, true), StructField("req3_amount", IntegerType, true), StructField("req4_amount", IntegerType, true), StructField("req5_amount", IntegerType, true), StructField("req6_amount", IntegerType, true), StructField("req7_amount", IntegerType, true), StructField("req8_amount", IntegerType, true), StructField("req9_amount", IntegerType, true), StructField("req10_amount", IntegerType, true), StructField("req11_amount", IntegerType, true), StructField("req12_amount", IntegerType, true), StructField("req13_amount", IntegerType, true), StructField("req14_amount", IntegerType, true), StructField("req15_amount", IntegerType, true), StructField("req16_amount", IntegerType, true), StructField("req17_amount", IntegerType, true), StructField("req18_amount", IntegerType, true), StructField("req19_amount", IntegerType, true), StructField("req20_amount", IntegerType, true), StructField("req21_amount", IntegerType, true), StructField("req22_amount", IntegerType, true), StructField("req23_amount", IntegerType, true), StructField("req24_amount", IntegerType, true), StructField("correct1_amount", IntegerType, true), StructField("correct2_amount", IntegerType, true), StructField("correct3_amount", IntegerType, true), StructField("correct4_amount", IntegerType, true), StructField("correct5_amount", IntegerType, true), StructField("correct6_amount", IntegerType, true), StructField("correct7_amount", IntegerType, true), StructField("correct8_amount", IntegerType, true), StructField("correct9_amount", IntegerType, true), StructField("correct10_amount", IntegerType, true), StructField("correct11_amount", IntegerType, true), StructField("correct12_amount", IntegerType, true), StructField("correct13_amount", IntegerType, true), StructField("correct14_amount", IntegerType, true), StructField("correct15_amount", IntegerType, true), StructField("correct16_amount", IntegerType, true), StructField("correct17_amount", IntegerType, true), StructField("correct18_amount", IntegerType, true), StructField("correct19_amount", IntegerType, true), StructField("correct20_amount", IntegerType, true), StructField("correct21_amount", IntegerType, true), StructField("correct22_amount", IntegerType, true), StructField("correct23_amount", IntegerType, true), StructField("correct24_amount", IntegerType, true) )) //一致率统计映射 val reqAccSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("compensate", StringType, true), StructField("src", StringType, true), StructField("simtype", StringType, true), StructField("distance", StringType, true), StructField("reqamount", IntegerType, true), StructField("nullreq_amount", IntegerType, true), StructField("percent100_req_amount", IntegerType, true), StructField("percent100_acc_amount", IntegerType, true), StructField("percent99_req_amount", IntegerType, true), StructField("percent99_acc_amount", IntegerType, true), StructField("percent98_req_amount", IntegerType, true), StructField("percent98_acc_amount", IntegerType, true), StructField("percent95_req_amount", IntegerType, true), StructField("percent95_acc_amount", IntegerType, true), StructField("percent90_req_amount", IntegerType, true), StructField("percent90_acc_amount", IntegerType, true), StructField("percent85_req_amount", IntegerType, true), StructField("percent85_acc_amount", IntegerType, true), StructField("percent80_req_amount", IntegerType, true), StructField("percent80_acc_amount", IntegerType, true), StructField("percent60_req_amount", IntegerType, true), StructField("percent60_acc_amount", IntegerType, true) )) val reqWaySimSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("compensate", StringType, true), StructField("index_type", StringType, true), StructField("src", StringType, true), StructField("simtype", StringType, true), StructField("reqamount", IntegerType, true), StructField("correctamount", IntegerType, true), StructField("halfhour_reqamount", IntegerType, true), StructField("halfhour_correctamount", IntegerType, true), StructField("onehour_reqamount", IntegerType, true), StructField("onehour_correctamount", IntegerType, true), StructField("req1_amount", IntegerType, true), StructField("req2_amount", IntegerType, true), StructField("req3_amount", IntegerType, true), StructField("req4_amount", IntegerType, true), StructField("req5_amount", IntegerType, true), StructField("req6_amount", IntegerType, true), StructField("req7_amount", IntegerType, true), StructField("req8_amount", IntegerType, true), StructField("req9_amount", IntegerType, true), StructField("req10_amount", IntegerType, true), StructField("req11_amount", IntegerType, true), StructField("req12_amount", IntegerType, true), StructField("req13_amount", IntegerType, true), StructField("req14_amount", IntegerType, true), StructField("req15_amount", IntegerType, true), StructField("req16_amount", IntegerType, true), StructField("req17_amount", IntegerType, true), StructField("req18_amount", IntegerType, true), StructField("req19_amount", IntegerType, true), StructField("req20_amount", IntegerType, true), StructField("req21_amount", IntegerType, true), StructField("req22_amount", IntegerType, true), StructField("req23_amount", IntegerType, true), StructField("req24_amount", IntegerType, true), StructField("correct1_amount", IntegerType, true), StructField("correct2_amount", IntegerType, true), StructField("correct3_amount", IntegerType, true), StructField("correct4_amount", IntegerType, true), StructField("correct5_amount", IntegerType, true), StructField("correct6_amount", IntegerType, true), StructField("correct7_amount", IntegerType, true), StructField("correct8_amount", IntegerType, true), StructField("correct9_amount", IntegerType, true), StructField("correct10_amount", IntegerType, true), StructField("correct11_amount", IntegerType, true), StructField("correct12_amount", IntegerType, true), StructField("correct13_amount", IntegerType, true), StructField("correct14_amount", IntegerType, true), StructField("correct15_amount", IntegerType, true), StructField("correct16_amount", IntegerType, true), StructField("correct17_amount", IntegerType, true), StructField("correct18_amount", IntegerType, true), StructField("correct19_amount", IntegerType, true), StructField("correct20_amount", IntegerType, true), StructField("correct21_amount", IntegerType, true), StructField("correct22_amount", IntegerType, true), StructField("correct23_amount", IntegerType, true), StructField("correct24_amount", IntegerType, true) )) //使用率统计映射 val useSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("sdkversion", StringType, true), StructField("system", StringType, true), StructField("navi_task_amount", IntegerType, true), StructField("navi_use_amount", IntegerType, true), StructField("gd_use_amount", IntegerType, true), StructField("sf_use_amount", IntegerType, true), StructField("whole_use_amount", IntegerType, true), StructField("driver_amount", IntegerType, true), StructField("driver_use_amount", IntegerType, true) )) //异常结束监控映射 val aemSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("sdkVersion", StringType, true), StructField("system", StringType, true), StructField("intosdk_nonavi_amount", IntegerType, true), StructField("navi_amount", IntegerType, true), StructField("exception_amount", IntegerType, true), StructField("halfway_end_mount", IntegerType, true), StructField("exception_notend_amount", IntegerType, true), StructField("falshback_amount", IntegerType, true), StructField("normal_amount", IntegerType, true), StructField("autoend_amount", IntegerType, true), StructField("manualend_amount", IntegerType, true) )) //时间偏差率映射 val timeDeviationRateSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("src", StringType, true), StructField("req_amount", StringType, true), StructField("diff_amount", StringType, true), StructField("diff_per_amount", StringType, true), StructField("req_0_10", StringType, true), StructField("diff_0_10", StringType, true), StructField("diff_per_0_10", StringType, true), StructField("req_10_20", StringType, true), StructField("diff_10_20", StringType, true), StructField("diff_per_10_20", StringType, true), StructField("req_20_40", StringType, true), StructField("diff_20_40", StringType, true), StructField("diff_per_20_40", StringType, true), StructField("req_40_50", StringType, true), StructField("diff_40_50", StringType, true), StructField("diff_per_40_50", StringType, true), StructField("req_50_70", StringType, true), StructField("diff_50_70", StringType, true), StructField("diff_per_50_70", StringType, true), StructField("req_70_90", StringType, true), StructField("diff_70_90", StringType, true), StructField("diff_per_70_90", StringType, true), StructField("req_90_120", StringType, true), StructField("diff_90_120", StringType, true), StructField("diff_per_90_120", StringType, true), StructField("req_120_150", StringType, true), StructField("diff_120_150", StringType, true), StructField("diff_per_120_150", StringType, true), StructField("req_150_180", StringType, true), StructField("diff_150_180", StringType, true), StructField("diff_per_150_180", StringType, true), StructField("req_180_240", StringType, true), StructField("diff_180_240", StringType, true), StructField("diff_per_180_240", StringType, true), StructField("req_240_350", StringType, true), StructField("diff_240_350", StringType, true), StructField("diff_per_240_350", StringType, true), StructField("req_350_370", StringType, true), StructField("diff_350_370", StringType, true), StructField("diff_per_350_370", StringType, true), StructField("req_370", StringType, true), StructField("diff_370", StringType, true), StructField("diff_per_370", StringType, true) )) //特定时间偏差率 val timePeriodDeviationRateSchema =StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("src", StringType, true), StructField("req_amount", StringType, true), StructField("diff_amount", StringType, true), StructField("navi_amount", StringType, true), StructField("req_half", StringType, true), StructField("diff_half", StringType, true), StructField("navi_half", StringType, true), StructField("req_1", StringType, true), StructField("diff_1", StringType, true), StructField("navi_1", StringType, true), StructField("req_2", StringType, true), StructField("diff_2", StringType, true), StructField("navi_2", StringType, true), StructField("req_3", StringType, true), StructField("diff_3", StringType, true), StructField("navi_3", StringType, true), StructField("req_4", StringType, true), StructField("diff_4", StringType, true), StructField("navi_4", StringType, true), StructField("req_5", StringType, true), StructField("diff_5", StringType, true), StructField("navi_5", StringType, true), StructField("req_6", StringType, true), StructField("diff_6", StringType, true), StructField("navi_6", StringType, true) )) /* *p2 */ //任务量统计映射 val taskSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("req_status", StringType, true), StructField("task_amount", IntegerType, true), StructField("navi_amount", IntegerType, true), StructField("dist_0", IntegerType, true), StructField("dist_50", IntegerType, true), StructField("dist_200", IntegerType, true), StructField("dist_500", IntegerType, true), StructField("city_interior", IntegerType, true), StructField("province_interior", IntegerType, true), StructField("interprovincial", IntegerType, true) )) //复用率统计映射 val reuseSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("province", StringType, true), StructField("citycode", StringType, true), StructField("sitecode", StringType, true), StructField("sdkversion", StringType, true), StructField("system", StringType, true), StructField("driver_amount", IntegerType, true), StructField("reuse_0", IntegerType, true), StructField("reuse_1", IntegerType, true), StructField("reuse_2", IntegerType, true), StructField("reuse_5", IntegerType, true), StructField("reuse_10", IntegerType, true), StructField("last_navitask_amount", IntegerType, true), StructField("driver_loss_amount", IntegerType, true), StructField("driver_keep_amount", IntegerType, true), StructField("last_no_navitask_amount", IntegerType, true), StructField("driver_add_amount", IntegerType, true) )) //问卷调查准确率映射 val qAccSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("app_version", StringType, true), StructField("sysytem", StringType, true), StructField("questionnaire_amount", IntegerType, true), StructField("driver_amount", IntegerType, true), StructField("q1_amount", IntegerType, true), StructField("q1_acc_amount", IntegerType, true), StructField("q2_amount", IntegerType, true), StructField("q2_acc_amount", IntegerType, true), StructField("q3_amount", IntegerType, true), StructField("q3_acc_amount", IntegerType, true), StructField("q4_amount", IntegerType, true), StructField("q4_acc_amount", IntegerType, true), StructField("q5_amount", IntegerType, true), StructField("q5_acc_amount", IntegerType, true), StructField("q6_amount", IntegerType, true), StructField("q6_acc_amount", IntegerType, true), StructField("q7_amount", IntegerType, true), StructField("q7_acc_amount", IntegerType, true), StructField("q8_amount", IntegerType, true), StructField("q8_acc_amount", IntegerType, true), StructField("q9_amount", IntegerType, true), StructField("q9_acc_amount", IntegerType, true), StructField("q10_amount", IntegerType, true), StructField("q10_acc_amount", IntegerType, true) )) //问卷调查错误占比映射 val qErrSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("app_version", StringType, true), StructField("sysytem", StringType, true), StructField("questionnaire_amount", IntegerType, true), StructField("driver_amount", IntegerType, true), StructField("q1_err_amount", IntegerType, true), StructField("q1_max_driver_amount", IntegerType, true), StructField("q1_max_driverId", StringType, true), StructField("q2_err_amount", IntegerType, true), StructField("q2_max_driver_amount", IntegerType, true), StructField("q2_max_driverId", StringType, true), StructField("q3_err_amount", IntegerType, true), StructField("q3_max_driver_amount", IntegerType, true), StructField("q3_max_driverid", StringType, true), StructField("q3_max_err_type", StringType, true), StructField("q3_max_err_type_amount", IntegerType, true), StructField("q4_err_amount", IntegerType, true), StructField("q4_max_driver_amount", IntegerType, true), StructField("q4_max_driverId", StringType, true), StructField("q5_err_amount", IntegerType, true), StructField("q5_max_driver_amount", IntegerType, true), StructField("q5_max_driverId", StringType, true), StructField("q6_err_amount", IntegerType, true), StructField("q6_max_driver_amount", IntegerType, true), StructField("q6_max_driverId", StringType, true), StructField("q7_err_amount", IntegerType, true), StructField("q7_max_driver_amount", IntegerType, true), StructField("q7_max_driverId", StringType, true), StructField("q8_err_amount", IntegerType, true), StructField("q8_max_driver_amount", IntegerType, true), StructField("q8_max_driverid", StringType, true), StructField("q8_max_err_type", StringType, true), StructField("q8_max_err_type_amount", IntegerType, true), StructField("q9_err_amount", IntegerType, true), StructField("q9_max_driver_amount", IntegerType, true), StructField("q9_max_driverId", StringType, true) )) //服务指标-响应时间映射 val respTimeSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("module", StringType, true), StructField("service", StringType, true), StructField("resp_0_200", IntegerType, true), StructField("resp_200_500", IntegerType, true), StructField("resp_500_1000", IntegerType, true), StructField("resp_1000_1500", IntegerType, true), StructField("resp_1500_2000", IntegerType, true), StructField("resp_2000_3000", IntegerType, true), StructField("res_3000", IntegerType, true) )) //服务指标-性能统计映射 val performanceSchema = StructType(List( StructField("id", StringType, true), StructField("statdate", StringType, true), StructField("module", StringType, true), StructField("service", StringType, true), StructField("minute", StringType, true), StructField("req_peak", IntegerType, true), StructField("minu_req_amount", IntegerType, true), StructField("avg_cost_time", IntegerType, true), StructField("minu_avg_cost_time", IntegerType, true), StructField("per99_cost_time", IntegerType, true), StructField("minu_per99_cost_time", IntegerType, true) )) val getSrcMap = (json:JSONObject) => json.getString("src") match { case "rp-my" => "传统" case "sf" => "传统" case "rp-jy-full" => "经验" case "rp-jy" => "经验" case "rp-jy-art" => "经验" case "rp-jy-fixed" => "经验" case "jy" => "经验" case "gd" => "高德" case "rp-gd" => "高德" case _ => "" } //相似度大于0.9过滤 val getReqOrWayRdd = (rdd: RDD[(Seq[String], JSONObject)],compensate:String,indexType:String,simType:String) => { rdd.map(obj => { val (dest_province,dest_citycode,dest_deptcode,srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3)) obj._2.put("IndexType",indexType) if ("未修正指标".equals(indexType)) obj._2.put("IndexValue",obj._2.getString("ft_right")) else obj._2.put("IndexValue",obj._2.getString("tl_ft_right")) (Seq(dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType),obj._2) }).filter(obj => { obj._1(6) match { case "sim1" => 0.9 <= obj._2.getDouble("similarity1") case "sim5" => 0.9 <= obj._2.getDouble("similarity5") case "sim1And5" => 0.9 <= obj._2.getDouble("similarity1") || 0.9 <= obj._2.getDouble("similarity5") } }) } /*process*/ /*p0*/ def processClickRateStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={ logger.error(">>>开始统计:ETA指标-点击率<<<") //从Top3点选线路的ETA结果表读取数据 val querysql = s"""select | dest_province, | dest_citycode, | dest_deptcode, | navi_id, | src, | route_index |FROM ${clickRateStatisticsSourTable} |where inc_day='$incDay' |""".stripMargin logger.error(querysql) val sourRdd = spark.sql(querysql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("naviId",obj.getString(3)) jsonObj.put("src",obj.getString(4)) jsonObj.put("routeindex",obj.getString(5)) ((obj.getString(0),obj.getString(1),obj.getString(2)),jsonObj) } ).persist(StorageLevel.DISK_ONLY) logger.error(s"共获取从Top3点选线路的ETA数据共:${sourRdd.count()}") //统计点选指标 val clickRateRdd = doClickRateStatistics(sourRdd,incDay) //保存到hive SparkUtils.df2Hive(spark,clickRateRdd,clickRateSchema,"append","dm_gis."+clickRateStatisticsDescTable,"statdate",incDay,logger) //测试注释 //保存到mysql SparkUtils.df2Mysql(spark,clickRateRdd,clickRateSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,clickRateStatisticsDescTable,incDay,logger) clickRateRdd.unpersist() } def processYawStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={ logger.error(">>>开始统计:ETA指标-偏航统计<<<") //读取导航表数据 val querySql = s""" |select |* |from |( |select | dest_province, | dest_citycode, | dest_deptcode, | sdk_version, | system, | navi_id, | route_src, | navi_starttime, | route_order, | hasYaw, | route_type, | route_count, | row_number() over(partition by navi_id order by route_order asc) num |FROM ${yawStatisticsSourTable} |where inc_day='$incDay' | and navi_starttime is not null and navi_starttime <> '' and navi_starttime != 'null' | and routeid is not null and routeid <>'' and routeid <>'null' | and navi_endtime != '' | and navi_endtime is not null | ) t | -- where num = 1 |""".stripMargin logger.error(querySql) val sourRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("naviId",obj.getString(5)) jsonObj.put("src",obj.getString(6)) jsonObj.put("naviStartTime",obj.getString(7)) jsonObj.put("route_order",obj.getString(8)) jsonObj.put("hasYaw",obj.getString(9)) jsonObj.put("route_type",obj.getString(10)) jsonObj.put("route_count",obj.getString(11)) ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj) } ).persist(StorageLevel.DISK_ONLY) logger.error(s"共查询eta导航数据:${sourRdd.count()}") //开始进行偏航统计 val yawRdd = doYawStatistics(sourRdd,incDay) //保存到hive SparkUtils.df2Hive(spark,yawRdd,yawSchema,"append","dm_gis."+yawStatisticsDescTable,"statdate",incDay,logger) // 测试需注释掉 //保存到mysql SparkUtils.df2Mysql(spark,yawRdd,yawSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,yawStatisticsDescTable,incDay,logger) yawRdd.unpersist() } def processAccuracyStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit = { logger.error(">>>开始统计:ETA指标-准确率统计<<<") //读取ETA结果汇总表 val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | ft_right, | tl_ft_right, | src, | duration, | plan_date |FROM $accStatisticsSourTable |where inc_day='$incDay' | and req_status = '0' |""".stripMargin logger.error(querySql) val sourUnFixRdd = spark.sql(querySql).rdd.repartition(100).map ( obj => { val jsonObj = new JSONObject() jsonObj.put("IndexType","未修正指标") jsonObj.put("IndexValue",obj.getString(3)) jsonObj.put("src",obj.getString(5)) jsonObj.put("duration",obj.getString(6)) jsonObj.put("planDate",obj.getString(7)) val srcMap = getSrcMap( jsonObj ) (Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj) } ).persist( StorageLevel.DISK_ONLY ) logger.error( s"获取未修正数据共:${ sourUnFixRdd.count() }" ) val sourFixRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("IndexType","修正指标") jsonObj.put("IndexValue",obj.getString(4)) jsonObj.put("src",obj.getString(5)) jsonObj.put("duration",obj.getString(6)) jsonObj.put("planDate",obj.getString(7)) val srcMap = getSrcMap( jsonObj ) ( Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj ) } ).persist( StorageLevel.DISK_ONLY ) logger.error(s"获取修正数据共:${ sourFixRdd.count() }") //开始进行准确率统计 val accRdd = doAccuracyStatistics(sourUnFixRdd,incDay ).union(doAccuracyStatistics( sourFixRdd, incDay )) //保存到hive SparkUtils.df2Hive(spark,accRdd,accSchema,"append","dm_gis."+accStatisticsDescTable,"statdate",incDay,logger) //测试注释 //保存到mysql SparkUtils.df2Mysql(spark,accRdd,accSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,accStatisticsDescTable,incDay,logger) accRdd.unpersist() } def processConsistentStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={ logger.error(">>>开始统计:ETA指标-一致率统计<<<") //读取ETA结果汇总表 val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | ft_right, | tl_ft_right, | src, | duration, | routeId, | req_order, | similarity1, | similarity5, | plan_date, | navi_endstatus, | req_type, | case when distance >=500 and distance < 1000 then '500' | when distance >= 1000 and distance < 5000 then '1000' | when distance >= 5000 and distance < 10000 then '5000' | when distance >= 10000 then '10000' | else '0' | end as distance |FROM $consistentSourTable |where inc_day='$incDay' | -- and req_status = '0' | and plan_date is not null and plan_date <>'' | and cast (similarity1 as float) >= 0 and cast (similarity1 as float) <= 1 | and cast (similarity5 as float) >= 0 and cast (similarity5 as float) <= 1 | and routeId is not null and routeId <>'' and routeId <>'null' |""".stripMargin logger.error(querySql) logger.error(">>>开始统计相似度不同区间的请求量和准确量<<<") val sourRdd = spark.sql(querySql).rdd.repartition(100).map(obj => { val jsonObj = new JSONObject() jsonObj.put("ft_right",obj.getString(3)) jsonObj.put("tl_ft_right",obj.getString(4)) jsonObj.put("src",obj.getString(5)) jsonObj.put("duration",obj.getString(6)) jsonObj.put("routeId",obj.getString(7)) jsonObj.put("req_order",obj.getString(8)) jsonObj.put("similarity1",obj.getString(9)) jsonObj.put("similarity5",obj.getString(10)) jsonObj.put("planDate",obj.getString(11)) jsonObj.put("navi_endstatus",obj.getString(12)) jsonObj.put("req_type",obj.getString(13)) //20210709增加字段 jsonObj.put("distance",obj.getString(14)) val srcMap = getSrcMap(jsonObj) (Seq(obj.getString(0),obj.getString(1),obj.getString(2),srcMap,obj.getString(14)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按请求获取汇总结果:${sourRdd.count()}") //按请求统计请求量和准确量 val reqRdd = doReqAccStatistics(sourRdd,incDay,"ReqAccIndex","开始按请求统计请求量和准确率") //top3请求统计 val wayReqRdd = sourRdd.filter(json => { "top3".equals(JSONUtils.getJsonValue(json._2,"req_type","")) }).persist(StorageLevel.DISK_ONLY) //按线路统计请求量和准确量 // val wayReqRdd = sourRdd.map(obj => { // ((obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._2.getString("routeId")),obj._2) // }).groupByKey() // .map(obj => { // val (dest_province,dest_citycode,dest_deptcode,srcMap,_) = obj._1 // // //取数条件为:navi_endstatus属于1-9下,req_type=top3或yaw,并且满足同routeId下req_order最小的请求记录 // val resList = obj._2.toList.filter(json => { // Range(1,9).toString().contains(json.getString("navi_endstatus")) && // Array("top3","yaw").contains(json.getString("req_type")) // }) // // val value = if (resList != null && resList.size > 0) resList.minBy( _.getString("req_order")) else new JSONObject() // (Seq(dest_province,dest_citycode,dest_deptcode,srcMap),value) // }).filter(json => json._2 != null && json._2.size() > 0).persist(StorageLevel.DISK_ONLY) logger.error(s"按线路获取汇总结果:${wayReqRdd.count()}") val wayRdd = doReqAccStatistics(wayReqRdd,incDay,"WayAccIndex","开始按线路统计请求量和准确率") //合并线路和请求方式同记得请求量和准确量 val reqAccRdd = reqRdd.union(wayRdd).persist(StorageLevel.DISK_ONLY) //保存到hive SparkUtils.df2Hive(spark,reqAccRdd,reqAccSchema,"append","dm_gis."+reqAccStatisticsDescTable,"statdate",incDay,logger) //测试注释 // 保存到mysql SparkUtils.df2Mysql(spark,reqAccRdd,reqAccSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,reqAccStatisticsDescTable,incDay,logger) reqRdd.unpersist() logger.error(s"共统计请求和访问:${reqAccRdd.count()}") //按请求统计相似度≥0.9的区间的准确率 val reqRdd2 = getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1").union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim5")) .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1")) .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1And5")) val reqSimRdd = doAccuracyStatistics(reqRdd2,incDay) //按路线统计相似度≥0.9的区间的准确率 val wayRdd2 = getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1").union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim5")) .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1")) .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1And5")) val waySimRdd = doAccuracyStatistics(wayRdd2,incDay) //合并 val reqWaySimRdd = reqSimRdd.union(waySimRdd) //保存到hive SparkUtils.df2Hive(spark,reqWaySimRdd,reqWaySimSchema,"append","dm_gis."+accConStatisticsDescTable,"statdate",incDay,logger) //保存到mysql SparkUtils.df2Mysql(spark,reqWaySimRdd,reqWaySimSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,accConStatisticsDescTable,incDay,logger) sourRdd.unpersist() reqWaySimRdd.unpersist() reqRdd2.unpersist() wayRdd2.unpersist() wayRdd.unpersist() } def processUseRateStatistics( spark : SparkSession ,incDay:String,yesterday:String ):Unit={ logger.error(">>>开始统计:ETA指标-导航使用率统计<<<") //获取导航数据 val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | sdk_version, | system, | task_id, | src_deptcode, | dest_deptcode, | routeid, | navi_endstatus, | navi_starttime, | navi_id |from ${useRateSourTable} |where inc_day= '${incDay}' |""".stripMargin logger.error(querySql) val naviRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("dest_province",obj.getString(0)) jsonObj.put("dest_citycode",obj.getString(1)) jsonObj.put("dest_deptcode",obj.getString(2)) jsonObj.put("sdk_version",obj.getString(3)) jsonObj.put("system",obj.getString(4)) jsonObj.put("task_id",obj.getString(5)) jsonObj.put("src_dept_code",obj.getString(6)) jsonObj.put("dest_dept_code",obj.getString(7)) jsonObj.put("route_id",obj.getString(8)) jsonObj.put("navi_end_status",obj.getString(9)) jsonObj.put("navi_starttime",obj.getString(10)) jsonObj.put("navi_id",obj.getString(11)) jsonObj.put("navi_endtime",obj.getString(10)) ((obj.getString(5),obj.getString(7)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取导航数据共${naviRdd.count()}") //获取顺路数据 val sLRdd = getSLTask(spark,incDay,yesterday) //获取关联经停数据 val joinTaskPassRdd = getTaskPass(spark,sLRdd,incDay,yesterday) //关联司机操作日志记录 val joinDriverTaskRdd = joinDriverTask(spark,joinTaskPassRdd,incDay,yesterday) //关联导航信息 val joinNaviInfoRdd = joinNaviInfo(spark,joinDriverTaskRdd,incDay,yesterday) //导航关联顺路数据 val totalRdd = joinSlNavi(joinNaviInfoRdd,naviRdd) //统计指标 val useDf = doUseRateStatistics(totalRdd,incDay) //保存到hive SparkUtils.df2Hive(spark,useDf,useSchema,"append","dm_gis."+useRateDestTable,"statdate",incDay,logger) //保存到mysql // SparkUtils.df2Mysql(spark,useDf,useSchema,descMysqlUserName,descMysqlPassWord, // "append",descMysqlUrl,useRateDestTable,incDay,logger) useDf.unpersist() } def processAbnormalExitMonitor( spark : SparkSession,incDay:String,yesterday:String ): Unit = { logger.error(">>>开始统计:ETA指标-异常退出监控<<<") //从导航表查询数据 val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | sdk_version, | system, | navi_id, | navi_endstatus, | routeid, | navi_starttime |FROM ${aemStatisticsSourTable} |where inc_day='$incDay' |and navi_endtime <> '' |and navi_endtime is not null |""".stripMargin logger.error(querySql) val sourRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("naviId",obj.getString(5)) jsonObj.put("naviEndStatus",obj.getString(6)) jsonObj.put("route_id",obj.getString(7)) jsonObj.put("naviStartTime",obj.getString(8)) ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj) } ).persist(StorageLevel.DISK_ONLY) logger.error(s"共获取导航数据:${sourRdd.count}") //开始进行监控统计 val aemRdd = doAbnormalExitMonitor(sourRdd,incDay) //保存到hive中 SparkUtils.df2Hive(spark,aemRdd,aemSchema,"append","dm_gis."+aemStatisticsoDescTable,"statdate",incDay,logger) //保存到mysql中 SparkUtils.df2Mysql(spark,aemRdd,aemSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,aemStatisticsoDescTable,incDay,logger) aemRdd.unpersist() } def processTimeDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={ logger.error("开始统计eta指标-时间偏差率") val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | src, | diff_time, | navi_time, | duration, | request_id |from $timeDeviationRateSourTable |where inc_day='$incDay' | and req_status = '0' | and duration is not null and duration <> '' and duration <> 'null' | and diff_time is not null and diff_time <> '' and diff_time <> 'null' | and navi_time is not null and navi_time <> '' and navi_time <> 'null' |""".stripMargin logger.error(querySql) val sourRdd = spark.sql(querySql).rdd.repartition(100) .map( obj => { val jsonObj = new JSONObject() jsonObj.put("src",obj.getString(3)) jsonObj.put("diff_time",obj.getString(4)) jsonObj.put("navi_time",obj.getString(5)) jsonObj.put("duration",obj.getString(6)) jsonObj.put("request_id",obj.getString(7)) val srcMap = getSrcMap(jsonObj) ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取ETA总汇总结果:${sourRdd.count()}") //统计时间偏差率 val timeDeviationRateRdd= doTimeDeviationRate(spark,sourRdd,incDay) logger.error("时间偏差率总数据量为:" + timeDeviationRateRdd.count()) timeDeviationRateRdd.take(1).foreach(println(_)) if(timeDeviationRateRdd.count() > 1) { //保存到hive中 SparkUtils.df2Hive(spark,timeDeviationRateRdd,timeDeviationRateSchema,"append", "dm_gis."+timeDeviationRateDescTable,"statdate",incDay,logger) //保存到mysql中 SparkUtils.df2Mysql(spark,timeDeviationRateRdd,timeDeviationRateSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,timeDeviationRateDescTable,incDay,logger) } timeDeviationRateRdd.unpersist() } def processTimePeriodDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={ logger.error("开始统计特定时间的时间偏差率") val querySql = s""" |select | dest_province, | dest_citycode, | dest_deptcode, | src, | diff_time, | navi_time, | req_time, | navi_endtime |from $timePeriodDeviationRateSourTable |where inc_day='$incDay' | and req_status = '0' | and diff_time is not null and diff_time <> '' and diff_time <> 'null' | and navi_time is not null and navi_time <> '' and navi_time <> 'null' | and req_time is not null and req_time <> '' and req_time <> 'null' | and navi_endtime is not null and navi_endtime <> '' and navi_endtime <> 'null' |""".stripMargin logger.error(querySql) val sourRdd = spark.sql(querySql).rdd.repartition(100) .map(obj => { val jsonObj = new JSONObject() jsonObj.put("src",obj.getString(3)) jsonObj.put("diff_time",obj.getString(4)) jsonObj.put("navi_time",obj.getString(5)) jsonObj.put("req_time",obj.getString(6)) jsonObj.put("navi_endtime",obj.getString(7)) val srcMap = getSrcMap(jsonObj) ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取ETA总汇总结果:${sourRdd.count}") //统计时段时间偏差率 val timePeriodDeviationRateRdd = doTimePeriodDeviationRate(spark,sourRdd,incDay) logger.error("时间偏差率总数据量为:" + timePeriodDeviationRateRdd.count()) timePeriodDeviationRateRdd.take(1).foreach(println(_)) if(timePeriodDeviationRateRdd.count() > 1) { //保存到hive中 SparkUtils.df2Hive(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, "append", "dm_gis." + timePeriodDeviationRateDescTable, "statdate", incDay, logger) //入库mysql SparkUtils.df2Mysql(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, descMysqlUserName, descMysqlPassWord, "append", descMysqlUrl, timePeriodDeviationRateDescTable, incDay, logger) } timePeriodDeviationRateRdd.unpersist() } /*p2*/ def processTaskAmountStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={ logger.error("开始统计任务量") //20210429变更sql val querySql = s""" |select | src_province, | src_citycode, | src_deptcode, | dest_province, | dest_citycode, | dest_deptcode, | a.navi_endStatus navi_endStatus, | b.distance distance, | a.navi_id navi_id, | task_id |from |( |select * |from |( | select | src_province, | src_citycode, | src_deptcode, | dest_province, | dest_citycode, | dest_deptcode, | navi_endStatus, | navi_id, | task_id, | row_number() over(partition by navi_id order by route_order asc) num | from dm_gis.gis_navi_result_union | where | inc_day = '$incDay' | -- and | -- navi_endStatus <> '' | and | route_type='top3' |) t |where num = 1 |) a |left outer join |( | |select | navi_id,distance |from |( |select | navi_id, | distance, | row_number() over(partition by navi_id order by route_order asc,req_order asc) num |from | gis_navi_eta_result_tmp |where | inc_day = '$incDay' |and | navi_endStatus <> '' | )c |where | c.num = 1 |) b |on a.navi_id = b.navi_id """.stripMargin // val querySql = // s""" // |select // | src_province, // | src_citycode, // | src_deptcode, // | dest_province, // | dest_citycode, // | dest_deptcode, // | navi_endStatus, // | distance, // | navi_id, // | task_id // |from ( // | select // | src_province, // | src_citycode, // | src_deptcode, // | dest_province, // | dest_citycode, // | dest_deptcode, // | navi_endStatus, // | distance, // | navi_id, // | task_id, // | row_number() over(partition by navi_id order by route_order asc,req_order asc) num // | from $taskAmountStatSourTable // | where inc_day = '$incDay' and navi_endStatus <> '' // |) t // |where num = 1 // |""".stripMargin logger.error( querySql ) val sourRdd = spark.sql( querySql ).rdd.repartition(100 ).map( obj => { val jsonObj = new JSONObject() jsonObj.put("src_province",obj.getString(0)) jsonObj.put("src_citycode",obj.getString(1)) jsonObj.put("src_deptcode",obj.getString(2)) jsonObj.put("dest_province",obj.getString(3)) jsonObj.put("dest_citycode",obj.getString(4)) jsonObj.put("dest_deptcode",obj.getString(5)) jsonObj.put("distance",obj.getString(7)) jsonObj.put("navi_id",obj.getString(8)) jsonObj.put("task_id",obj.getString(9)) val status= try {obj.getInt(6)} catch {case e:Exception => 0} val naviEndStatus = status match { case status if (status.toInt == 0) => 0 case status if (status.toInt >= 1 && status.toInt <= 9) => 1 case status if(status.toInt >= 10) => 2 } ((obj.getString(3),obj.getString(4),obj.getString(5),naviEndStatus.toString),jsonObj) }).persist( StorageLevel.DISK_ONLY ) logger.error(s"获取导航任务数据共:${ sourRdd.count() }") //进行指标统计 val taskAmountDf = doTaskAmountStatistics( sourRdd, incDay ) //存入hive SparkUtils.df2Hive( spark,taskAmountDf,taskSchema,"append","dm_gis."+taskAmountStatDescTable,"statdate",incDay,logger ) // 测试需注释掉 //保存到mysql SparkUtils.df2Mysql( spark,taskAmountDf,taskSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,taskAmountStatDescTable,incDay,logger ) taskAmountDf.unpersist() } def processReuseRateStatistics ( spark:SparkSession,incDay:String,yesterday:String ): Unit = { logger.error("开始统计复用率") //关联查询历史司机复用和当天复用情况 val queryHisReuseSql = s""" |select | nvl(t1.dest_province,t2.province) as province, | nvl(t1.dest_citycode,t2.citycode) as citycode, | nvl(t1.dest_deptcode,t2.sitecode) as sitecode, | nvl(t1.sdk_version,t2.sdkversion) as sdkversion, | nvl(t1.system,t2.system) as system, | nvl(t1.driver_id,t2.driver_id) as driver_id, | nvl(t1.use_amount,0) + nvl(t2.his_use_amount,0) as his_use_amount, | '$incDay' as statdate |from ( | select | dest_province, | dest_citycode, | dest_deptcode, | sdk_version, | system, | driver_id, | count(1) as use_amount | from ${reuseStatSourTable} | where inc_day='$incDay' | group by dest_province,dest_citycode,dest_deptcode,sdk_version,system,driver_id |) t1 |FULL JOIN |( | SELECT | province, | citycode, | sitecode, | sdkversion, | system, | driver_id, | his_use_amount | from dm_gis.eta_index_reuse_his_i | where statdate='$yesterday' |) t2 |on t1.dest_province = t2.province | and t1.dest_citycode = t2.citycode | and t1.dest_deptcode = t2.sitecode | and t1.sdk_version = t2.sdkversion | and t1.system = t2.system | and t1.driver_id = t2.driver_id |""".stripMargin logger.error(queryHisReuseSql) val hisReuseDf = spark.sql(queryHisReuseSql).repartition(100).persist(StorageLevel.DISK_ONLY) logger.error(s"获取历史复用数据共${hisReuseDf.count()}") logger.error("保存复用情况到hive历史复用情况中") SparkUtils.df2Hive( spark,hisReuseDf,"append","dm_gis.eta_index_reuse_his_i","statdate",incDay,logger ) //统计司机复用情况 val driverReuseRdd = doDriverReuseStatistics(hisReuseDf) //统计司机变更情况 val driverChangeRdd = doDriverChangeStatistics(spark,incDay,yesterday) //关联司机复用和司机变更 val driverReuseChangeDf = joinDriverReuseChange(driverChangeRdd,driverReuseRdd,incDay) //存入hive SparkUtils.df2Hive(spark,driverReuseChangeDf,reuseSchema,"append","dm_gis."+reuseStatDestTable,"statdate",incDay,logger) //存入mysql SparkUtils.df2Mysql(spark,driverReuseChangeDf,reuseSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,reuseStatDestTable,incDay,logger) driverReuseChangeDf.unpersist() } def processQuestionnaireAccRateStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = { logger.error("开始统计问卷调查率") //获取问卷调查数据 val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) ) //问卷调查正确率 val questionnaireAccRateDf = doQuestionnaireAccRateStatistics(questionnaireDataRdd,incDay,yesterday) //存入hive SparkUtils.df2Hive(spark,questionnaireAccRateDf,qAccSchema,"append","dm_gis."+questionnaireAccDestTable, "statdate",incDay,logger) //测试注释 //存入mysql SparkUtils.df2Mysql(spark,questionnaireAccRateDf,qAccSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,questionnaireAccDestTable,incDay,logger) questionnaireAccRateDf.unpersist() } def processQuestionnaireDriverErrPerStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = { logger.error("开始统计问卷调查司机错误占比") val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) ) //统计问卷调查司机错误占比 val questionnaireErrRateDf = doQuestionnaireErrRateStatistics(questionnaireDataRdd,incDay,yesterday) //存入hive SparkUtils.df2Hive(spark,questionnaireErrRateDf,qErrSchema,"append","dm_gis."+questionnaireErrDestTable, "statdate",incDay,logger) //存入mysql SparkUtils.df2Mysql(spark,questionnaireErrRateDf,qErrSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,questionnaireErrDestTable,incDay,logger) questionnaireErrRateDf.unpersist() } def processServiceResponseStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={ logger.error("开始统计服务指标-响应时间") val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday)) //统计服务指标-响应时间 val serviceCostTimeDf = doServiceResponseStatistics(serviceCostTimeDataRdd,incDay) //存入hive SparkUtils.df2Hive(spark,serviceCostTimeDf,respTimeSchema,"append","dm_gis."+serviceResponseDescTable, "statdate",incDay,logger) //存入mysql SparkUtils.df2Mysql(spark,serviceCostTimeDf,respTimeSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,serviceResponseDescTable,incDay,logger) serviceCostTimeDf.unpersist() } def processServicePerformanceStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={ logger.error("开始统计服务指标-性能统计") val df = new SimpleDateFormat("yyyyMMdd HH:mm:ss") val tm1 = df.parse(incDay + " 00:00:00").getTime //获取请求时间戳所属的分钟 val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday)) .filter(json => StringUtils.isNotBlank(json._2.getString("req_time"))) .map(obj => { val (module,service) = obj._1 val minute = DateUtil.getCurrentMinDiff(tm1,obj._2.getLong("req_time")) ((module,service,minute),obj._2) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取查询时间和响应时间都部位空的数据共:${serviceCostTimeDataRdd.count}") //统计服务指标-性能统计 val serviceCostTimeDf = doServicePerformanceStatistics(spark,serviceCostTimeDataRdd,incDay) //存入hive SparkUtils.df2Hive(spark,serviceCostTimeDf,performanceSchema,"append","dm_gis."+servicePerformanceDescTable, "statdate",incDay,logger) //存入mysql SparkUtils.df2Mysql(spark,serviceCostTimeDf,performanceSchema,descMysqlUserName,descMysqlPassWord, "append",descMysqlUrl,servicePerformanceDescTable,incDay,logger) serviceCostTimeDf.unpersist() } /*do anything*/ /*p1*/ def doClickRateStatistics( sourRdd: RDD[((String, String, String), JSONObject)],incDay: String) = { //按照省份(dest_province)、城市(dest_citycode)、场地(dest_deptcode)、日期(inc_day)聚合 val clickRateDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map(obj => { val (dest_province,dest_citycode,dest_deptcode) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_")) //val id = Base64.getEncoder().encodeToString(Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_").getBytes("utf-8")) //总导航量 val naviAmount = resList.length //选择了第几条线路统计 val routeindexMap = resList.groupBy(_.getString("routeindex")) val firstWayAmount = routeindexMap.getOrElse("0",List()).length val secondWayAmount = routeindexMap.getOrElse("1",List()).length val thirdWayAmount = routeindexMap.getOrElse("2",List()).length //传统/经验统计 val srcTypeMap = resList.map(json => { val src = getSrcMap(json) json.put("src",src) json }).groupBy(_.getString("src")) val traditionTypeAmount = srcTypeMap.getOrElse("传统",List()).length val experienceTypeAmount = srcTypeMap.getOrElse("经验",List()).length //20210429新增gd统计 val gdTypeAmount = srcTypeMap.getOrElse("高德",List()).length Row(id,incDay,dest_province,dest_citycode,dest_deptcode,naviAmount,firstWayAmount, secondWayAmount,thirdWayAmount,traditionTypeAmount,experienceTypeAmount,gdTypeAmount) }).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error(s"共统计点选率指标:${clickRateDf.count()}") clickRateDf } def doYawStatistics(sourRdd: RDD[((String, String, String, String, String), JSONObject)],incDay: String)={ logger.error(s"开始进行偏航统计") val yawDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map(obj => { val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_")) //导航次数 val naviDistinctList = resList.map(_.getString("naviId")).distinct val naviAmount = naviDistinctList.length //偏航预判平均次数 val yawPredictAvg = resList.length / naviDistinctList.length //偏航终判平均次数 val hasYawList = resList.filter(json => "true".equals(json.getString("hasYaw"))) val yawFinalJudgmentAvg = hasYawList.length / naviDistinctList.length //99%偏航 val hasYawRouteCountList = hasYawList.groupBy(_.getString("naviId")).map(_._2.head).toList.sortBy(_.getString("route_count")) val _99PercentYaw = if (hasYawRouteCountList.length == 0 ) 0 else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.99).toInt).getInteger("route_count") //95%偏航 val _95PercentYaw = if (hasYawRouteCountList.length == 0) 0 else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.95).toInt).getInteger("route_count") //90%偏航 val _90PercentYaw = if (hasYawRouteCountList.length == 0) 0 else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.90).toInt).getInteger("route_count") //经验偏航次数 var yawList = List[String]() var useList = List[String]() var gdList = List[String]() resList.filter(json => { ( "true".equals(json.getString("hasYaw")) || "top3".equals(json.getString("route_type"))) && lang.StringUtils.isNotBlank(json.getString("naviStartTime")) }) .groupBy(_.getString("naviId")).foreach(obj => { val maxSrc = getSrcMap(obj._2.maxBy(_.getString("route_order"))) yawList = maxSrc :: yawList //obj._2.foreach(elem => useList = getSrcMap(elem) :: useList) }) resList.filter(json => { ( "true".equals(json.getString("hasYaw"))) && lang.StringUtils.isNotBlank(json.getString("naviStartTime")) }).groupBy(_.getString("naviId")).foreach(obj => { obj._2.foreach(elem => useList = getSrcMap(elem) :: useList) }) val experienceYawAmount = yawList.count(src => "经验".equals(src)) //经验线路使用总量 val experienceUseAmount = useList.count(src => "经验".equals(src)) //传统偏航次数 val traditionalYawAmount = yawList.count(src => "传统".equals(src)) //传统线路使用总量 val traditionalUseAmount = useList.count(src => "传统".equals(src)) //新增gd线路使用总量 val gdYawAmount = yawList.count(src => "高德".equals(src)) val gdUseAmount = useList.count(src => "高德".equals(src)) Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviAmount,yawPredictAvg, yawFinalJudgmentAvg,_99PercentYaw,_95PercentYaw,_90PercentYaw,experienceYawAmount,experienceUseAmount, traditionalYawAmount,traditionalUseAmount,gdYawAmount,gdUseAmount) }).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error( s"共统计点选率指标:${yawDf.count()}" ) yawDf } def doAccuracyStatistics( sourRdd: RDD[(Seq[String],JSONObject)],incDay:String)= { logger.error(s"开始进行准确率统计") val accDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => { val resList = obj._2 var rowSeq = null:Row val md5Instance = MD5Util.getMD5Instance //总请求量 val reqAmount = resList.length //正确量 val accList = resList.filter(json => "1".equals(json.getString("IndexValue"))) val accAmount = accList.length //半小时请求量 val durationReqList = resList.filter( json => StringUtils.isNotBlank(json.getString("duration"))) val durationAccList = accList.filter( json => StringUtils.isNotBlank(json.getString("duration"))) val halfHourReqAmount = durationReqList.filter( json => 20*60 <= json.getString("duration").toDouble && json.getString("duration").toDouble < 40*60).length //半小时正确量 val halfHourAccAmount = durationAccList.filter( json => 20*60 <= json.getString("duration").toDouble && json.getString("duration").toDouble < 40*60).length //一小时请求量:50≤duration<70时的数据量 val oneHourReqAmount = durationReqList.filter( json => 50*60 <= json.getString("duration").toDouble && json.getString("duration").toDouble < 70*60).length //一小时正确量 val oneHourAccAmount = durationAccList.filter( json => 50*60 <= json.getString("duration").toDouble && json.getString("duration").toDouble < 70*60).length //分时段请求量 obj._1.length match { case 4 => { val (dest_province, dest_citycode, dest_deptcode, srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3)) val id = MD5Util.getMD5(md5Instance, Array(incDay, dest_province, dest_citycode , dest_deptcode, srcMap, resList(0).getString("IndexType")).mkString("_")) rowSeq = Row(id, incDay, dest_province, dest_citycode, dest_deptcode, resList(0).getString("IndexType"), srcMap, reqAmount , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount) } case 7 => { val (dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4),obj._1(5),obj._1(6)) val id = MD5Util.getMD5(md5Instance,Array(incDay,dest_province,dest_citycode , dest_deptcode,compensate,indexType,srcMap,simType).mkString("_")) rowSeq = Row(id,incDay,dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType,reqAmount , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount) } } var timeReqList = List[Int]() val timeReqMap = resList.filter(json => StringUtils.isNotBlank(json.getString("planDate"))) .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt) .groupBy(str => str) for (i <- 0 until 24){ val timeReqValue = timeReqMap.getOrElse(i,List()) timeReqList = timeReqList :+ timeReqValue.length } //rowSeq = Row.merge(rowSeq,Row.fromSeq(timeReqList)) //分时段准确量 val timeAccMap = accList.filter(json => StringUtils.isNotBlank(json.getString("planDate"))) .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt) .groupBy(str => str) for (i <- 0 until 24){ val timeReqValue = timeAccMap.getOrElse(i,List()) timeReqList = timeReqList :+ timeReqValue.length } Row.merge(rowSeq,Row.fromSeq(timeReqList)) } ).persist(StorageLevel.DISK_ONLY) //sourRdd.unpersist() logger.error(s"共统计指标:${accDf.count}") sourRdd.unpersist() accDf } def doReqAccStatistics(sourRdd: RDD[(Seq[String], JSONObject)],incDay:String,indexString:String,reqString:String)={ logger.error(s"$reqString") val perArray = Array(1,0.99,0.98,0.95,0.9,0.85,0.8,0.6,0) val reqRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).flatMap(obj => { val (dest_province,dest_citycode,dest_deptcode,srcMap,distance) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4)) val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val sim1id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim1").mkString("_")) val sim5id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim5").mkString("_")) val sim1And5Rowid = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,distance,srcMap,indexString,"sim1And5").mkString("_")) //总请求量 val reqAmount = resList.length //空值请求量 val sim1NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1"))} ).length val sim5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity5"))} ).length val sim1And5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1")) && StringUtils.isBlank(json.getString("similarity5")) } ).length //[1-0]请求量 val sim1Row = Row(sim1id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1",distance,reqAmount,sim1NullReqAmount) val sim5Row = Row(sim5id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim5",distance,reqAmount,sim5NullReqAmount) val sim1And5Row = Row(sim1And5Rowid,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1And5",distance,reqAmount,sim1And5NullReqAmount) var sim1PerList = List[Int]() var sim5PerList = List[Int]() var sim1And5PerList = List[Int]() for ( i <- 0 until perArray.length-1) { //sim1请求量 val sim1ReqList = resList.filter( json => { json.getDouble("similarity1") < perArray(i) && json.getDouble("similarity1") >= perArray(i+1) }) sim1PerList = sim1PerList :+ sim1ReqList.length //sim1正确量 sim1PerList = sim1PerList :+ sim1ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length //sim5请求量 val sim5ReqList = resList.filter( json => { json.getDouble("similarity5") < perArray(i) && json.getDouble("similarity5") >= perArray(i+1) }) sim5PerList = sim5PerList :+ sim5ReqList.length //sim5正确量 sim5PerList = sim5PerList :+ sim5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length //sim1And5请求量 val sim1And5ReqList = resList.filter( json => { ( json.getDouble("similarity1") < perArray(i) && json.getDouble("similarity1") >= perArray(i+1)) || ( json.getDouble("similarity5") < perArray(i) && json.getDouble("similarity5") >= perArray(i+1)) }) sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.length //sim1And5请求量 sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length } //增加distance统计 var rowList = List[Row]() rowList = rowList :+ Row.merge(sim1Row,Row.fromSeq(sim1PerList)) rowList = rowList :+ Row.merge(sim5Row,Row.fromSeq(sim5PerList)) rowList = rowList :+ Row.merge(sim1And5Row,Row.fromSeq(sim1And5PerList)) rowList }).persist(StorageLevel.DISK_ONLY) logger.error(s"共统计指标${reqRdd.count()}") sourRdd.unpersist() reqRdd } def doAbnormalExitMonitor(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String)={ logger.error("开始进行异常退出监控指标统计") val aemDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => { val (dest_province,dest_citycode,dest_deptcode,sdkVersion,system) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_")) //进入SDK未导航量 val intoSdkNoNaviAmount = resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime"))) .map(_.getString("naviId")).distinct.length //进入SDK导航总次数 val naviAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("naviStartTime"))) .map(_.getString("naviId")).distinct.length //异常总量 val exceptionAmount = resList.filter(json => Array("10","11","12").contains(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length //中途结束量 val halfWayEndAmount = resList.filter(json => "10".equals(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length //异常未结束 val exceptionNotEndAmount = resList.filter(json => "12".equals(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length - resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime"))) .map(_.getString("naviId")).distinct.length //闪退量 val falshBackAmount = resList.filter(json => "11".equals(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length //正常总量 val normalAmount = resList.filter(json => Array("1","2","3").contains(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length //自动结束 val autoEndAmount = resList.filter(json => "2".equals(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length //手动结束 val manualEndAmount = resList.filter(json => "1".equals(json.getString("naviEndStatus"))) .map(_.getString("naviId")).distinct.length Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,intoSdkNoNaviAmount,naviAmount,exceptionAmount, halfWayEndAmount,exceptionNotEndAmount,falshBackAmount,normalAmount,autoEndAmount,manualEndAmount) } ).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error(s"共统计指标:${aemDf.count}") aemDf } def getTaskPass( spark:SparkSession,taskAmountRdd:RDD[JSONObject],incDay:String,yesterday:String ) ={ //获取经停表的数据 val passSrcQuerySql = s""" |select |* |from( | select | task_id, | pass_zone_code, | actual_depart_tm, | row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num | from ods_russtask.tt_vehicle_task_pass_zone_monitor | where | inc_day >= '${yesterday}' and inc_day <= '${incDay}' | and actual_depart_tm != '' and actual_depart_tm is not null and actual_depart_tm != 'null' | and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null' |) t |where t.num=1 |""".stripMargin logger.error(passSrcQuerySql) //关联获取实际出发时间 val passSrcRdd = spark.sql(passSrcQuerySql).rdd.repartition(100).map(obj => { val jsonObj = new JSONObject() jsonObj.put("actual_depart_tm",obj.getInt(3).toString) ((obj.getString(0),obj.getString(1)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取经停实际出发时间非空:${passSrcRdd.count}") val passDestQuerySql = s""" |select |* |from( | select | task_id, | pass_zone_code, | actual_arrive_tm, | row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num | from ods_russtask.tt_vehicle_task_pass_zone_monitor | where | inc_day >= '${yesterday}' and inc_day <= '${incDay}' | and actual_arrive_tm != '' and actual_arrive_tm is not null and actual_arrive_tm != 'null' | and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null' |) t |where t.num=1 |""".stripMargin logger.error(passSrcQuerySql) //关联获取实际出发时间 val passDesrRdd = spark.sql(passDestQuerySql).rdd.repartition(100).map(obj => { val jsonObj = new JSONObject() jsonObj.put("actual_arrive_tm",obj.getInt(3).toString) ((obj.getString(0),obj.getString(1)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取经停实际到达时间非空:${passDesrRdd.count}") val merge = (obj: ((String, String), (JSONObject, Option[JSONObject])),flag:Int) => { val leftBody = obj._2._1 val rightBody = obj._2._2 if ( rightBody.nonEmpty ) leftBody.fluentPutAll(rightBody.get) val res = flag match { case 2 => ( (leftBody.getString("task_id"),leftBody.getString("dest_zone_code")) ,leftBody ) case 1 => ( (leftBody.getString("task_id"),"") ,leftBody ) } res } //关联任务起点和终点数据 val joinTaskPassRdd = taskAmountRdd.map(json => { ((json.getString("task_id"),json.getString("src_zone_code")),json) }).leftOuterJoin(passSrcRdd).map( obj => { merge(obj,2) }).leftOuterJoin(passDesrRdd).map( obj => { merge(obj,1) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共关联经停数据:${joinTaskPassRdd.count()}") taskAmountRdd.unpersist() passSrcRdd.unpersist() passDesrRdd.unpersist() joinTaskPassRdd } def joinDriverTask( spark:SparkSession ,joinTaskPassRdd : RDD[((String, String),JSONObject)],incDay:String,yesterday:String )={ val querySql = s""" |select | concat(tt.own_dept_code,tt.driver_task_id) as `task_id`, | tt.user_code, | max(tt.operate_time ) as `max_time`, | min(tt.operate_time ) as `min_time`, | max(tt.operate_time_0) as `geton_operate_time`, | max(tt.operate_time_1) as `start_operate_time` |from ( | select | own_dept_code, | driver_task_id, | user_code, | operate_time, | case when operate_type='0' then operate_time else "" end as `operate_time_0`, | case when operate_type='1' then operate_time else "" end as `operate_time_1` | from ods_shiva_ground.tm_driver_task_log | Where | inc_day >= '${yesterday}' and inc_day <= '${incDay}' |) tt |group by tt.own_dept_code,tt.driver_task_id,tt.user_code |""".stripMargin logger.error(querySql) val driverTaskRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("user_code",obj.getString(1)) jsonObj.put("max_time",obj.getString(2)) jsonObj.put("min_time",obj.getString(3)) jsonObj.put("geton_operate_time",obj.getString(3)) jsonObj.put("start_operate_time",obj.getString(4)) ((obj.getString(0),""),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共查询司机操作日志记录:${driverTaskRdd.count()}") val joinDriverTaskRdd = joinTaskPassRdd.leftOuterJoin(driverTaskRdd).map( obj => { val leftBody = obj._2._1 val rightBody = obj._2._2 if ( rightBody.nonEmpty ) leftBody.fluentPutAll(rightBody.get) (leftBody.getString("user_code"),leftBody) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共关联司机操作日志记录数据:${joinDriverTaskRdd.count()}") joinTaskPassRdd.unpersist() driverTaskRdd.unpersist() joinDriverTaskRdd } def joinNaviInfo(spark:SparkSession ,joinDriverTaskRdd: RDD[(String, JSONObject)],incDay:String,yesterday:String)={ val querySql = s""" |select | properties_username, | from_unixtime(cast (time / 1000 as int),'yyyy-MM-dd HH:mm:ss') as time, | event_id, | model, | dt |from ods_inc_ubas.product_inc_ubas_dev_shiva_trtms_driver |where | dt >= '${yesterday}' and dt <= '${incDay}' | and event_id='ground_tbp_navigate_dialog_confirm' | and properties_username is not null and properties_username <> '' | and properties_username <> 'null' | and time <> '' and time is not null |""".stripMargin logger.error(querySql) val naviInfoRdd = spark.sql(querySql).rdd.repartition(100).map( obj => { val jsonObj = new JSONObject() jsonObj.put("properties_username",obj.getString(0)) jsonObj.put("time",obj.getString(1)) jsonObj.put("event_id",obj.getString(2)) jsonObj.put("model",obj.getString(3)) jsonObj.put("dt",obj.getString(4)) (obj.getString(0),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共获取导航信息${naviInfoRdd.count()}") //关联导航信息 val jsoinNaviInfoRdd = joinDriverTaskRdd.leftOuterJoin(naviInfoRdd).map(obj => { val leftBody = obj._2._1 val rightBody = obj._2._2 try { if (rightBody.nonEmpty){ if ( leftBody.getDate("actual_arrive_tm").compareTo(leftBody.getDate("max_time")) > 0 ){ if ( leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 && leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 && leftBody.getDate("time").compareTo(leftBody.getDate("max_time")) < 1) leftBody.fluentPutAll(rightBody.get) } else { if (leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 && leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 && leftBody.getDate("time").compareTo(leftBody.getDate("actual_arrive_tm")) < 1) leftBody.fluentPutAll(rightBody.get) } } } catch {case e:Exception => logger.error(leftBody.toString ) ;throw e} ((leftBody.getString("task_id"),leftBody.getString("dest_zone_code")),leftBody) }).persist(StorageLevel.DISK_ONLY) logger.error( s"关联导航信息共${jsoinNaviInfoRdd.count()}" ) joinDriverTaskRdd.unpersist() naviInfoRdd.unpersist() jsoinNaviInfoRdd } def getSLTask(spark:SparkSession,incDay:String,yesterday:String) ={ //获取顺陆任务量数据 val taskAmountQuerySql = s""" | select | task_id, | src_city_code, | dest_city_code, | stop_over_zone_code, | main_driver_account, | is_stop, | carrier_name, | carrier_id, | carrier_type, | driver_source, | dest_province, | src_zone_code, | dest_zone_code | from dm_grd.grd_new_task_detail | where | main_driver_account != '' and main_driver_account is not null and main_driver_account <> 'null' | and driver_source='0' | and inc_day >= '${yesterday}' and inc_day <= '${incDay}' |""".stripMargin logger.error(taskAmountQuerySql) val taskAmountTmpRdd = spark.sql(taskAmountQuerySql).rdd.repartition(100) .flatMap( obj => { val jsonObj = new JSONObject() jsonObj.put("task_id",obj.getString(0)) jsonObj.put("src_city_code",obj.getString(1)) jsonObj.put("dest_city_code",obj.getString(2)) jsonObj.put("stop_over_zone_code",obj.getString(3)) jsonObj.put("main_driver_account",obj.getString(4)) jsonObj.put("is_stop",obj.getInt(5).toString) jsonObj.put("carrier_name",obj.getString(6)) jsonObj.put("carrier_id",obj.getLong(7).toString) jsonObj.put("carrier_type",obj.getInt(8).toString) jsonObj.put("driver_source",obj.getString(9)) jsonObj.put("dest_province",obj.getString(10)) jsonObj.put("src_zone_code",obj.getString(11)) jsonObj.put("dest_zone_code",obj.getString(12)) val deptCodeArr = obj.getString(3).split(",") var jsonList = List[JSONObject]() if ( deptCodeArr.length > 1) { for ( i <- 0 until deptCodeArr.length -1 ){ val jsonTmp = new JSONObject().fluentPutAll(jsonObj) jsonTmp.put("src_zone_code",deptCodeArr(i)) jsonTmp.put("dest_zone_code",deptCodeArr(i+1)) jsonTmp.put("task_id_jt",obj.getString(0)+"_"+deptCodeArr(i+1)+"_"+deptCodeArr(i+1)) jsonList = jsonList :+ jsonTmp } } else jsonList = jsonList :+ jsonObj jsonList })/*.map(json => { (json.getString("task_id"),json) })*/.persist(StorageLevel.DISK_ONLY) logger.error(s"共获取顺陆任务量数据:${taskAmountTmpRdd.count()}") taskAmountTmpRdd } def joinSlNavi( joinNaviInfoRdd: RDD[((String, String), JSONObject)],naviRdd: RDD[((String, String), JSONObject)])={ val totalRdd = joinNaviInfoRdd.leftOuterJoin(naviRdd).map( obj => { val leftBody = obj._2._1 val rightBoby = obj._2._2.getOrElse(new JSONObject()) leftBody.fluentPutAll(rightBoby) val toString = (str:String) => { if (StringUtils.isEmpty(str)) "" else str } ((toString(leftBody.getString("dest_province")),toString(leftBody.getString("dest_city_code")), toString(leftBody.getString("dest_zone_code")),toString(leftBody.getString("sdk_version")), toString(leftBody.getString("system"))),leftBody) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共关联顺路导航数据:${totalRdd.count()}") joinNaviInfoRdd.unpersist() naviRdd.unpersist() totalRdd } def doUseRateStatistics(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String) ={ val useDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => { val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_")) //reqid&destcode&src_deptcode //导航任务总量 val naviTaskAmount = resList.map(_.getString("task_id_jt")).distinct.length //高德使用量 val gdUseAmount = resList.count( json => "ground_tbp_navigate_dialog_confirm".equals(json.getString("event_id")) ) //顺丰导航使用量 val sfUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id"))) .map(json => (json.getString("route_id"),json.getString("src_dept_code"),json.getString("dest_dept_code")) ).distinct.length //导航使用量 val naviUseAmount = gdUseAmount + sfUseAmount //全程使用量 var wholeUseAmount = 0 resList.filter(json => StringUtils.isNotBlank(json.getString("navi_end_status"))).groupBy(_.getString("task_id")).map( elem =>{ val naviIdAmount = elem._2.map(_.getString("navi_id")).length if (naviIdAmount == 1) wholeUseAmount += elem._2.filter(json => {Array("1","2","3").contains(json.getString("navi_end_status"))}) .map(_.getString("navi_id")).distinct.length else { val tmpList = elem._2.sortBy(_.getString("navi_starttime")) if (Array("1","2","3").contains(tmpList.last.getString("navi_end_status"))) for( i <- 0 until tmpList.length -1 ){ if (tmpList(i+1).getLong("navi_starttime") - tmpList(i).getLong("navi_endtime") <= 300) wholeUseAmount += 1 } wholeUseAmount } }) //司机总量 val driverAmount = resList.map(_.getString("main_driver_account")).distinct.length //司机使用量 val driverUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id"))).map(_.getString("main_driver_account")).distinct.length Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviTaskAmount,naviUseAmount,gdUseAmount,sfUseAmount,wholeUseAmount,driverAmount,driverUseAmount) }).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error( s"共统计使用率率指标:${useDf.count()}" ) useDf } def doTimeDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) ={ val timeDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val (destProvince,destPitycode,destDeptcode,src) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_")) //总请求量 val reqAmount = resList.length //统计每个请求的偏差量和导航量 var diffAmount = 0:Double var diffPerAmount = 0:Double resList.map(elem => { diffAmount += Math.abs(elem.getDouble("diff_time")) diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time") }) //区间 var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString ,(diffAmount / resList.size).toString,(diffPerAmount/resList.size).toString) val timeArr = Array(0,10,20,40,50,70,90,120,150,180,240,350,370,Int.MaxValue) for(i <- 0 until timeArr.length -1 ){ val tmpList = resList.filter(json => { json.getDouble("duration") >= timeArr(i)*60 && json.getDouble("duration") <= timeArr(i+1)*60 }) val tmpReqAmount = tmpList.length rowList = rowList :+ tmpReqAmount.toString var diffTempAmount = 0:Double var diffPerTempAmount = 0:Double tmpList.map(elem => { diffTempAmount += Math.abs(elem.getDouble("diff_time")) diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time") }) rowList = rowList :+ (diffTempAmount / tmpList.size).toString rowList = rowList :+ (diffPerTempAmount / tmpList.size).toString } Row.fromSeq(rowList) }).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error( s"共统计时间偏差率指标:${timeDeviationRateRdd.count()}" ) val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timeDeviationRateRdd,spark) sourRdd.unpersist() timeDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf) } def calByOtherDmiension(incDay:String,timeDeviationRateRdd:RDD[Row],spark:SparkSession) ={ val calLogic = (tup:Tuple4[String,String,String,String],resList:List[Row]) => { val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,tup._1,tup._2,tup._3,tup._4).mkString("_")) val rowSeq = Row(id,incDay,tup._1,tup._2,tup._3,tup._4) var dataList =Array[String]() if(resList != null && resList.size > 0){ dataList = Array.fill(resList.head.size)("0") for (elem <- resList) { for(i <- 0 until elem.length-1){ dataList(i) = (dataList(i).toDouble + elem(i).toString.toDouble).toString } } } Row.merge(rowSeq,Row.fromSeq(dataList)) } //按天维度统计 val dayList = timeDeviationRateRdd.map( obj => { Row.fromSeq(obj.toSeq.drop(6)) } ).collect().toList val dayRow = calLogic(("all","all","all","all"),dayList) val allDf = spark.sparkContext.parallelize(Array(dayRow)).persist(StorageLevel.DISK_ONLY) allDf.count() logger.error("按天聚合完毕") //按省聚合 val provinceDf = timeDeviationRateRdd.map( obj => { (obj.getString(2),Row.fromSeq(obj.toSeq.drop(6))) }) .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow) .map( obj => { val tup4 = (obj._1,"all","all","all") val resList = obj._2 calLogic(tup4,resList) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按照省维度统计共:${provinceDf.count()}") //按照城市维度统计 val cityDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3)),Row.fromSeq(obj.toSeq.drop(6))) } ) .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow) .map( obj => { val tup4 = (obj._1._1,obj._1._2,"all","all") val resList = obj._2 calLogic(tup4,resList) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按照城市维度统计共:${cityDf.count()}") //按照场地维度统计 val deptDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3),obj.getString(4)),Row.fromSeq(obj.toSeq.drop(6))) } ) .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow) .map( obj => { val tup4 = (obj._1._1,obj._1._2,obj._1._3,"all") val resList = obj._2 calLogic(tup4,resList) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按照场地维度统计共:${deptDf.count()}") (allDf,provinceDf,cityDf,deptDf) } def doTimePeriodDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) = { val timePeriodDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map(obj => { val (destProvince,destPitycode,destDeptcode,src) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_")) //总请求量 val reqAmount = resList.length //统计每个请求的偏差量和导航量 var diffAmount = 0:Double var diffPerAmount = 0:Double resList.map(elem => { diffAmount += Math.abs(elem.getDouble("diff_time")) diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time") }) //每个小时偏差量和导航量 val timeArr = Array(40,70,130,190,250,310,370) var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString, (diffAmount / resList.size).toString(),(diffPerAmount / resList.size).toString()) for (time <- timeArr) { val tmpList = resList.filter(json => { json.getLong("req_time") >= json.getLong("navi_endtime") - time * 60 * 1000 && json.getLong("req_time") >= json.getLong("navi_endtime") - (time-20) * 60 * 1000 }) val tmpReqAmount = tmpList.length rowList = rowList :+ tmpReqAmount.toString var diffTempAmount = 0:Double var diffPerTempAmount = 0:Double tmpList.map(elem => { diffTempAmount += Math.abs(elem.getDouble("diff_time")) diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time") }) rowList = rowList :+ (diffTempAmount / tmpList.size).toString rowList = rowList :+ (diffPerTempAmount / tmpList.size).toString } Row.fromSeq(rowList) }).persist(StorageLevel.DISK_ONLY) sourRdd.unpersist() logger.error( s"共统计时间偏差率指标:${timePeriodDeviationRateRdd.count()}" ) val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timePeriodDeviationRateRdd,spark) timePeriodDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf) } /*p2*/ def doTaskAmountStatistics( sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String )={ val taskAmountDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => { val resList = obj._2 val (src_province,src_citycode,src_deptcode,src_status) = obj._1 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,src_province,src_citycode,src_deptcode,src_status).mkString("_")) //任务量 val taskAmount = resList.map(_.getString("task_id")).distinct.length //导航次数 val naviAmount = resList.length //导航距离分组统计 val naviDistGroup = resList.filter(json => StringUtils.isNotBlank(json.getString("distance"))) .map(json => { val distStr = json.getString("distance").replaceAll("(\\..*$)","") val dist = if(StringUtils.isNotBlank(distStr)) distStr.toLong / 1000 else 0 val distGroup= dist match { case dist if dist >=0 && dist < 50 => "dist_0" case dist if dist >=50 && dist < 200 => "dist_50" case dist if dist >=200 && dist < 500 => "dist_200" case _ => "dist_500" } (distGroup,"") }).groupBy(_._1) //省际、省内、市内统计 val areaGroup = resList.map(json => { val ag = json match { case json if StringUtils.isNotBlank(json.getString("src_citycode")) && json.getString("src_citycode").equals(json.getString("dest_citycode")) => "市内" case json if StringUtils.isNotBlank( json.getString("src_province")) && StringUtils.isNotBlank(json.getString("src_citycode")) && json.getString("src_province").equals(json.getString("dest_province")) => "省内" case json if StringUtils.isNotBlank( json.getString("src_province")) && StringUtils.isNotBlank(json.getString("src_citycode")) => "省际" case _ => "null" } (ag,"") }).groupBy(_._1) Row(id,incDay,src_province,src_citycode,src_deptcode,src_status,taskAmount,naviAmount,naviDistGroup.getOrElse("dist_0",List()).length, naviDistGroup.getOrElse("dist_50",List()).length,naviDistGroup.getOrElse("dist_200",List()).length,naviDistGroup.getOrElse("dist_500",List()).length ,areaGroup.getOrElse("市内",List()).length,areaGroup.getOrElse("省内",List()).length,areaGroup.getOrElse("省际",List()).length) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共统计指标:${taskAmountDf.count()}") sourRdd.unpersist() taskAmountDf } def doDriverReuseStatistics( hisReuseDf:DataFrame ) ={ val driverReuseRdd = hisReuseDf.rdd.map( obj => { val jsonObj = new JSONObject() jsonObj.put("driver_id",obj.getString(5)) jsonObj.put("his_use_amount",obj.getLong(6)) ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj) }).aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val resList = obj._2 //司机总量 val driverAmount = resList.map(_.getString("driver_id")).distinct.length //无复用司机占比 val reuse_0 = resList.count(_.getLong("his_use_amount") == 1) //司机复用一次占比 val reuse_1 = resList.count(_.getLong("his_use_amount") == 2) //司机复用2次占比 val reuse_2 = resList.count(_.getLong("his_use_amount") == 3) //司机复用5次占比 val reuse_5 = resList.count(json => json.getLong("his_use_amount") >= 6 && json.getLong("his_use_amount") < 11) //司机复用10次占比 val reuse_10 = resList.count(json => json.getLong("his_use_amount") >= 11 ) val jsonObj = new JSONObject() jsonObj.put("driver_amount",driverAmount) jsonObj.put("reuse_0",reuse_0) jsonObj.put("reuse_1",reuse_1) jsonObj.put("reuse_2",reuse_2) jsonObj.put("reuse_5",reuse_5) jsonObj.put("reuse_10",reuse_10) (obj._1,jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取司机复用数据共:${driverReuseRdd.count()}") hisReuseDf.unpersist() driverReuseRdd } def doDriverChangeStatistics( spark:SparkSession,incDay:String,yesterday:String)={ logger.error("开始统计司机变更情况") val taskNaviSql = s""" |select | t2.dest_province as dest_province, | t2.dest_citycode as dest_citycode, | t2.dest_deptcode as dest_deptcode, | t2.sdk_version, | t2.system, | t1.main_driver_account, | if(t2.driver_id is null,false,true) as isNavi | from ( | select | * | from ( | select | dest_province, | dest_city_code, | dest_zone_code, | main_driver_account, | row_number() over( partition by main_driver_account order by actual_depart_tm asc ) num | from dm_grd.grd_new_task_detail | where inc_day ='%s' | and actual_depart_tm is not null and actual_depart_tm <> '' and actual_depart_tm <> 'null' | and actual_arrive_tm is not null and actual_arrive_tm <> '' and actual_arrive_tm <> 'null' | and main_driver_account is not null and main_driver_account <> '' and main_driver_account <> 'null' | and driver_source = 0 | ) tmp where num = 1 | ) t1 | LEFT JOIN | ( | select | * | from ( | SELECT | dest_province, | dest_citycode, | dest_deptcode, | sdk_version, | system, | driver_id, | row_number() over( partition by driver_id order by navi_starttime asc ) num | from ${reuseStatSourTable} | where inc_day ='%s' | and dest_province is not null and dest_province <> '' | and dest_citycode is not null and dest_citycode <> '' | and dest_deptcode is not null and dest_deptcode <> '' | and sdk_version is not null and sdk_version <> '' | and system is not null and system <> '' | ) tmp where num = 1 | ) t2 | on t1.main_driver_account = t2.driver_id |""".stripMargin //查询司机前一天的任务和导航情况 val lastTaskNaviSql = taskNaviSql.format(yesterday,yesterday) logger.error(lastTaskNaviSql) val lastTaskNaviDF = spark.sql(lastTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY) logger.error(s"共获取查询司机前一天的任务和导航数据:${lastTaskNaviDF.count()}") //查询司机当天的任务和导航情况 val currentTaskNaviSql = taskNaviSql.format(incDay,incDay) logger.error(currentTaskNaviSql) val currentTaskNaviDF = spark.sql(currentTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY) logger.error(s"共获取查询司机当天的任务和导航数据:${currentTaskNaviDF.count()}") //统计司机流失率,司机新增率,司机存留率 lastTaskNaviDF.registerTempTable("lastTaskNavi"); currentTaskNaviDF.registerTempTable("currentTaskNavi"); val driverChangeRdd = spark.sql(""" |select | nvl(t1.dest_province,"") dest_province_t1, | nvl(t2.dest_province,"") dest_province_t2, | nvl(t1.dest_citycode,"") dest_citycode_t1, | nvl(t2.dest_citycode,"") dest_citycode_t2, | nvl(t1.dest_deptcode,"") dest_deptcode_t1, | nvl(t2.dest_deptcode,"") dest_deptcode_t2, | nvl(t1.sdk_version,"") sdk_version_t1, | nvl(t2.sdk_version,"") sdk_version_t2, | nvl(t1.system,"") system_t1, | nvl(t2.system,"") system_t2, | t1.isNavi as isLastNavi, | t2.isNavi as isCurrentNavi |from lastTaskNavi t1 |join currentTaskNavi t2 |on t1.main_driver_account = t2.main_driver_account |""".stripMargin ) .rdd.map( obj =>{ val jsonObj = new JSONObject() jsonObj.put("isLastNavi",obj.getBoolean(10)) jsonObj.put("isCurrentNavi",obj.getBoolean(11)) var key = ("","","","","") if( obj.getBoolean(10) ) key = (obj.getString(0),obj.getString(2),obj.getString(4),obj.getString(6),obj.getString(8)) else key = (obj.getString(1),obj.getString(3),obj.getString(5),obj.getString(7),obj.getString(9)) (key,jsonObj) }) .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val resList = obj._2 //司机流失率 val lastNaviTaskList = resList.filter( _.getBoolean("isLastNavi") ) val lastNaviTaskAmount = lastNaviTaskList.length val driverLossAmount = lastNaviTaskList.count( !_.getBoolean("isCurrentNavi") ) //司机存留率 val driverKeepAmount = lastNaviTaskList.count( _.getBoolean("isCurrentNavi") ) //司机新增率 val lastNoNaviTaskList = resList.filter( ! _.getBoolean("isLastNavi") ) val lastNoNaviTaskAmount = lastNoNaviTaskList.length val driverAddAmount = lastNoNaviTaskList.count( _.getBoolean("isCurrentNavi") ) val jsonObj = new JSONObject() jsonObj.put("lastNaviTaskAmount",lastNaviTaskAmount) jsonObj.put("driverLossAmount",driverLossAmount) jsonObj.put("driverKeepAmount",driverKeepAmount) jsonObj.put("lastNoNaviTaskAmount",lastNoNaviTaskAmount) jsonObj.put("driverAddAmount",driverAddAmount) (obj._1,jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"统计司机变更指标共:${driverChangeRdd.count()}") lastTaskNaviDF.unpersist() currentTaskNaviDF.unpersist() driverChangeRdd } def joinDriverReuseChange( driverChangeRdd: RDD[((String, String, String, String, String), JSONObject)], driverReuseRdd: RDD[((String, String, String, String, String), JSONObject)],incDay:String) ={ val driverReuseChangeDf = driverReuseRdd./*leftOuterJoin*/fullOuterJoin(driverChangeRdd).map( obj => { /*val leftBody = obj._2._1 val rightBody = obj._2._2*/ var leftBody = new JSONObject() var rightBody = new JSONObject() val (dest_province,dest_citycode,dest_deptcode,sdk_version,system) = obj._1 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system).mkString("_")) /*if ( rightBody.nonEmpty ) leftBody.fluentPutAll(rightBody.get)*/ if ( obj._2._1.nonEmpty && obj._2._2.nonEmpty) leftBody = obj._2._1.get.fluentPutAll(obj._2._2.get) else if(obj._2._1.isEmpty && obj._2._2.nonEmpty) leftBody = obj._2._2.get else leftBody = obj._2._1.get Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system,leftBody.getInteger("driver_amount"),leftBody.getInteger("reuse_0"), leftBody.getInteger("reuse_1"),leftBody.getInteger("reuse_2"),leftBody.getInteger("reuse_5"),leftBody.getInteger("reuse_10"), leftBody.getInteger("lastNaviTaskAmount"),leftBody.getInteger("driverLossAmount"),leftBody.getInteger("driverKeepAmount"), leftBody.getInteger("lastNoNaviTaskAmount"),leftBody.getInteger("driverAddAmount")) }).persist(StorageLevel.DISK_ONLY) logger.error(s"共关联司机重用和变更情况共${driverReuseChangeDf.count()}") driverChangeRdd.unpersist() driverReuseRdd.unpersist() driverReuseChangeDf } def getQuestionnaireData( spark:SparkSession,incDay:String,yesterday:String ) = { logger.error("开始获取问卷调查数据") //20210428修改sql val querySql = s""" |select | rel_id,question_seq,answer_txt,template_id,id,account,file_name,device_type,create_time,app_version |from ( | select | a.rel_id, | a.question_seq, | a.answer_txt, | a.template_id, | b.id, | b.account, | b.file_name, | nvl(b.device_type,'') device_type, | b.create_time, | nvl(b.app_version,'') app_version, | b.inc_day, | row_number() over( PARTITION by a.rel_id,a.question_seq,b.file_name,b.account order by a.rel_id desc,a.question_seq desc) num | from | ( | select | * | from | dm_gis.record_tt_cm_question_answer | where | inc_day = '$incDay' | ) a | left JOIN ( | select | * | from | dm_gis.record_tt_cm_question | where | inc_day = '$incDay' | ) b | on a.rel_id = b.id |) tmp |where num = 1 and question_seq < 11 |""".stripMargin logger.error(querySql) questionnaireRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{ val jsonObj = new JSONObject() jsonObj.put("rel_id",obj.getString(0)) jsonObj.put("question_seq",obj.getString(1)) jsonObj.put("answer_txt",obj.getString(2)) jsonObj.put("template_id",obj.getString(3)) jsonObj.put("id",obj.getString(4)) jsonObj.put("account",obj.getString(5)) jsonObj.put("file_name",obj.getString(6)) jsonObj.put("create_time",obj.getString(8)) ((obj.getString(9),obj.getString(7)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取问卷调查数据共:${questionnaireRdd.count()}") questionnaireRdd } def doQuestionnaireAccRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) ={ val questionnaireAccRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val (app_version,device_type) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_")) val getAcc = (qSeq:String,qtype:String) => { val qList = resList.filter(json => qSeq.equals(json.getString("question_seq"))) val qAmount = qList.length val qAccAmount = qList.count( json => qtype.equals(json.getString("answer_txt")) ) (qAmount,qAccAmount) } //问卷数量 val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length //司机数量 val driverAmount = resList.map(_.getString("account")).distinct.length //问题1正确率 val (q1Amount,q1AccAmount) = getAcc("1","B") //问题2正确率 val (q2Amount,q2AccAmount) = getAcc("2","B") //问题3正确率 val (q3Amount,q3AccAmount) = getAcc("3","A") //问题4正确率 val q4List = resList.filter(json => "4".equals(json.getString("question_seq"))) val q4Amount = q4List.count(json => ! "C".equals(json.getString("answer_txt"))) val q4AccAmount = q4List.count(json => "A".equals(json.getString("answer_txt"))) //问题5正确率 val (q5Amount,q5AccAmount) = getAcc("5","A") //问题6正确率 val (q6Amount,q6AccAmount) = getAcc("6","A") //问题7正确率 val (q7Amount,q7AccAmount) = getAcc("7","A") //问题8正确率 val (q8Amount,q8AccAmount) = getAcc("8","A") //问题9正确率 val (q9Amount,q9AccAmount) = getAcc("9","A") //20210429新增问题10正确率 val q10List = resList.filter(json => "10".equals(json.getString("question_seq"))) var q10Amount = 0d q10List.map(json => {(JSONUtils.getJsonValueDouble(json,"answer_txt",0))}).toStream.foreach(x => {q10Amount = q10Amount + x}) val q10AccAmount = q10List.length //val (q10Amount,q10AccAmount) = getAcc("10","B") Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1Amount,q1AccAmount,q2Amount,q2AccAmount,q3Amount,q3AccAmount, q4Amount,q4AccAmount, q5Amount,q5AccAmount,q6Amount,q6AccAmount,q7Amount,q7AccAmount,q8Amount,q8AccAmount,q9Amount,q9AccAmount,q10Amount.toInt,q10AccAmount) }).persist(StorageLevel.DISK_ONLY) logger.error(s"统计问卷调查准确率指标共:${questionnaireAccRateDf.count()}") questionnaireDataRdd.unpersist() questionnaireAccRateDf } def doQuestionnaireErrRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) = { val questionnaireErrRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val (app_version,device_type) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_")) val gerErrDriver = ( qList: List[JSONObject]) => { val qErrAmount = qList.length val (maxDriverId,maxList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("account")).maxBy(_._2.length) (qErrAmount,maxList.length,maxDriverId) } val getErr1 = (qSeq:String) => { val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && "B".equals( json.getString("answer_txt")) ) val (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList ) (qErrAmount,maxDriverAmount,maxDriverId) } val getErr3 = (qSeq:String) => { val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && ! "A".equals(json.getString("answer_txt")) ) val (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList ) val (maxErrType,maxErrTypeList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("answer_txt")).maxBy(_._2.length) (qErrAmount,maxDriverAmount,maxDriverId,maxErrType,maxErrTypeList.length) } //问卷数量 val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length //司机数量 val driverAmount = resList.map(_.getString("account")).distinct.length //问题1错误量 val (q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId) = getErr1("1") //问题2错误量 val (q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId) = getErr1("2") //问题3错误量 val (q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount) =getErr3("3") //问题4错误量 val (q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId) = getErr1("4") //问题5错误量 val (q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId) = getErr1("5") //问题6错误量 val (q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId) = getErr1("6") //问题7错误量 val (q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId) = getErr1("7") //问题8错误量 val (q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount) =getErr3("8") //问题9错误量 val (q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId) = getErr1("9") Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId,q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId, q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount,q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId,q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId, q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId,q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId,q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount, q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId) }).persist(StorageLevel.DISK_ONLY) logger.error(s"统计问卷调查错误占比指标共:${questionnaireErrRateDf.count()}") questionnaireDataRdd.unpersist() questionnaireErrRateDf } def getServiceCostTimeData(spark:SparkSession,incDay:String,yesterday:String )={ logger.error("开始查询服务响应时间") val querySql = s""" |--top3-eta |select | "top3" as `module`, | "top3-eta" as `service`, | req_costtime as `cost_time`, | req_starttime as `req_time` |from dm_gis.gis_navi_top3_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' |union all |--Top3-pns |select | "top3" as `module`, | "top3-pns" as `service`, | pnstop3_costtime as `cost_time`, | pnstop3_starttime as `req_time` |from dm_gis.gis_navi_top3_parse |where inc_day='$incDay' | and pnstop3_costtime is not null and pnstop3_costtime <> '' and pnstop3_costtime <> 'null' |union all |--noYaw-eta |select | "noYaw" as `module`, | "noYaw-eta" as `service`, | req_costtime as `cost_time`, | req_starttime as `req_time` |from dm_gis.gis_navi_no_yaw_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' |union all |--noYaw-pns |select | "noYaw" as `module`, | "noYaw-pns" as `service`, | qmpoint_costtime as `cost_time`, | qmpoint_starttime as `req_time` |from dm_gis.gis_navi_no_yaw_parse |where inc_day='$incDay' | and qmpoint_costtime is not null and qmpoint_costtime <> '' and qmpoint_costtime <> 'null' |union all |--Yaw-eta |select | "Yaw" as `module`, | "Yaw-eta" as `service`, | req_costtime as `cost_time`, | req_start_time as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' |union all |--Yaw-eta-sf |select | "Yaw" as `module`, | "Yaw-eta-sf" as `service`, | req_costtime as `cost_time`, | req_start_time as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' | and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null' | and (jypnstop3_costtime is null or jypnstop3_costtime = '' or jypnstop3_costtime = 'null') |union all |--Yaw-eta-jy |select | "Yaw" as `module`, | "Yaw-eta-jy" as `service`, | req_costtime as `cost_time`, | req_start_time as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' | and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null' | and (sfpnstop3_costtime is null or sfpnstop3_costtime = '' or sfpnstop3_costtime = 'null') |union all |--Yaw-eta-jy-sf |select | "Yaw" as `module`, | "Yaw-eta-jy-sf" as `service`, | req_costtime as `cost_time`, | req_start_time as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null' | and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null' | and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null' |union all |--Yaw-pns-1 |select | "Yaw" as `module`, | "Yaw-pns" as `service`, | sfpnstop3_costtime as `cost_time`, | sfpnstop3_starttime as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null' |union all |-- Yaw-pns-2 |select | "Yaw" as `module`, | "Yaw-pns" as `service`, | jypnstop3_costtime as `cost_time`, | jypnstop3_starttime as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null' |union all |--Yaw-pns-sf |select | "Yaw" as `module`, | "Yaw-pns-sf" as `service`, | sfpnstop3_costtime as `cost_time`, | jypnstop3_starttime as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null' |union all |--Yaw-pns-jy |select | "Yaw" as `module`, | "Yaw-pns-jy" as `service`, | jypnstop3_costtime as `cost_time`, | jypnstop3_starttime as `req_time` |from dm_gis.gis_navi_yaw_parse |where inc_day='$incDay' | and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null' |""".stripMargin logger.error(querySql) serviceCostTimeRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{ val jsonObj = new JSONObject() jsonObj.put("module",obj.getString(0)) jsonObj.put("service",obj.getString(1)) jsonObj.put("cost_time",obj.getString(2)) jsonObj.put("req_time",obj.getString(3)) ((obj.getString(0),obj.getString(1)),jsonObj) }).persist(StorageLevel.DISK_ONLY) logger.error(s"获取问卷调查数据共:${serviceCostTimeRdd.count()}") serviceCostTimeRdd } def doServiceResponseStatistics( serviceCostTimeRdd: RDD[((String, String), JSONObject)],incDay:String) ={ logger.error("开始执行统计服务指标-响应时间") val serviceCostTimeDf = serviceCostTimeRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val (module,service) = obj._1 val resList = obj._2 val md5Instance = MD5Util.getMD5Instance val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service).mkString("_")) val segMap = resList.map(json => { json match { case json if (json.getLong("cost_time") < 200) => json.put("seg_time","resp_0_200") case json if (json.getLong("cost_time") >= 200 && json.getLong("cost_time") < 500) => json.put("seg_time","resp_200_500") case json if (json.getLong("cost_time") >= 500 && json.getLong("cost_time") < 1000) => json.put("seg_time","resp_500_1000") case json if (json.getLong("cost_time") >= 1000 && json.getLong("cost_time") < 1500) => json.put("seg_time","resp_1000_1500") case json if (json.getLong("cost_time") >= 1500 && json.getLong("cost_time") < 2000) => json.put("seg_time","resp_1500_2000") case json if (json.getLong("cost_time") >= 2000 && json.getLong("cost_time") < 3000) => json.put("seg_time","resp_2000_3000") case json if (json.getLong("cost_time") >= 3000) => json.put("seg_time","res_3000") } json }).groupBy(_.getString("seg_time")) Row(id,incDay,module,service,segMap.getOrElse("resp_0_200",List()).length,segMap.getOrElse("resp_200_500",List()).length, segMap.getOrElse("resp_500_1000",List()).length,segMap.getOrElse("resp_1000_1500",List()).length, segMap.getOrElse("resp_1500_2000",List()).length, segMap.getOrElse("resp_2000_3000",List()).length, segMap.getOrElse("res_3000",List()).length ) }).persist(StorageLevel.DISK_ONLY) logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}") serviceCostTimeDf } def doServicePerformanceStatistics( spark:SparkSession,serviceCostTimeDataRdd:RDD[((String, String, Long),JSONObject)],incDay:String) = { logger.error("开始执行统计服务性能-指标统计") val commonCal = (resList: List[JSONObject]) => { val md5Instance = MD5Util.getMD5Instance //请求峰值 val reqPeak = resList.maxBy(_.getInteger("minuReqAmount")).getInteger("minuReqAmount") //平均响应时间 val avgCostTime = resList.map(json => {json.getInteger("minAvgCostTime").toInt}).sum / resList.length //99%响应时间 var costTimeList = List[String]() resList.map(json => costTimeList = json.getString("costTimeList").split(",").toList ::: costTimeList) costTimeList = costTimeList.sortBy(_.toLong) val per99CostTime = costTimeList (Math.round((costTimeList.length - 1 ) * 0.99).toInt).toInt (md5Instance,reqPeak,avgCostTime,per99CostTime) } val serviceCostTimeDfTmp = serviceCostTimeDataRdd .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val sdf = new SimpleDateFormat("yyyyMMddHHmmss"); val date = sdf.parse(incDay+"000000") val (module,service,minute) = obj._1 val resList = obj._2 //每分钟访问量 val minuReqAmount = resList.length //每分钟平均响应时间 val minAvgCostTime = resList.map(_.getInteger("cost_time").toInt).sum / minuReqAmount //每分钟99%响应时间 resList.sortBy(_.getLong("cost_time")) val minPer99CostTime = resList (Math.round((resList.length - 1 ) * 0.99).toInt).getInteger("cost_time") val jsonObj= new JSONObject() jsonObj.put("minute",sdf.format(new Date(date .getTime() + minute *60 * 1000 ) ).dropRight(2)) jsonObj.put("minuReqAmount",minuReqAmount) jsonObj.put("minAvgCostTime",minAvgCostTime) jsonObj.put("minPer99CostTime",minPer99CostTime) jsonObj.put("costTimeList", resList.map(_.getLong("cost_time")).mkString(",")) ((module,service),jsonObj) } ).persist(StorageLevel.DISK_ONLY) val serviceCostTimeDf = serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .flatMap(obj => { val (module,service) = obj._1 val resList = obj._2 val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList) for ( jsonObj <- resList ) yield { val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,jsonObj.getString("minute")).mkString("_")) Row(id,incDay,module,service,jsonObj.getString("minute"),reqPeak ,jsonObj.getInteger("minuReqAmount"),avgCostTime ,jsonObj.getInteger("minAvgCostTime"),per99CostTime ,jsonObj.getInteger("minPer99CostTime") ) } }).persist(StorageLevel.DISK_ONLY) logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}") /*按时间维度统计*/ val dateList = serviceCostTimeDfTmp.values.collect() val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(dateList.toList) val id = MD5Util.getMD5(md5Instance, Array(incDay,"all","all","all").mkString("_")) val allDf = spark.sparkContext.parallelize(Array(Row( id,incDay,"all","all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1 ))).persist(StorageLevel.DISK_ONLY) logger.error("按天聚合完毕") /*按模块维度统计*/ val moduleRdd = serviceCostTimeDfTmp.map(obj => (obj._1._1,obj._2)) .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map(obj => { val module = obj._1 val resList = obj._2 val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList) val id = MD5Util.getMD5(md5Instance, Array(incDay,module,"all","all").mkString("_")) Row(id,incDay,module,"all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按照模块维度统计共${moduleRdd.count}") //按照服务维度统计 val serviceRdd = serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp) .map( obj => { val (module,service) = obj._1 val resList = obj._2 val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList) val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,"all").mkString("_")) Row(id,incDay,module,service,"all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1) }).persist(StorageLevel.DISK_ONLY) logger.error(s"按照服务维度统计共${serviceRdd.count}") serviceCostTimeDfTmp.unpersist() allDf.unpersist() serviceCostTimeDataRdd.unpersist() moduleRdd.unpersist() serviceCostTimeDf.union(allDf).union(moduleRdd).union(serviceRdd) } def start(spark: SparkSession,args: Array[String] ): Unit = { var incDay = "" var yesterday = "" if (args.length >2 ){ incDay = args(0) yesterday = args(1) val funcArr = args.drop(2) println(incDay,yesterday) //反射调用任意统计方法 funcArr.foreach( index => { val funcName = funcMap.get(index) println("\n\n\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}开始<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") this.getClass.getDeclaredMethod(funcName, classOf[SparkSession], classOf[String],classOf[String]).invoke(this, spark, incDay,yesterday) println("\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}结束<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"+"\n\n\n") } ) logger.error(">>>统计结束!<<<") } else { logger.error("参数长度异常") System.exit(1) } } }
标签:逻辑,obj,报表,val,StructField,中样,IntegerType,getString,true 来源: https://blog.csdn.net/qq_38680817/article/details/119319831