Hudi-通过Spark分析滴滴出行数据
作者:互联网
工具类
package com.zhen.hudi.didi import org.apache.spark.sql.SparkSession /** * @Author FengZhen * @Date 3/1/22 9:34 PM * @Description SparkSql操作数据(加载读取和保存写入)时工具类 * 比如获取SparkSession实例对象等 */ object SparkUtils { /** * 构建SparkSession实例对象时,默认情况下本地模式运行 * @return */ def createSparkSession(clazz: Class[_], master: String = "local[4]", partitions: Int = 4): SparkSession = { SparkSession.builder() .appName(clazz.getSimpleName.stripSuffix("$")) .master(master) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", partitions) .getOrCreate() } def main(args: Array[String]): Unit = { val spark = createSparkSession(this.getClass) println(spark) Thread.sleep(1000 * 100) spark.stop() } }
CSV文件入hudi
package com.zhen.hudi.didi import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.functions._ /** * @Author FengZhen * @Date 3/1/22 9:29 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,先读取CSV,保存至hudi表 * 1.构建SparkSession实例对象(集成Hudi和HDFS) * 2.加载本地CSV文件格式弟弟出行数据 * 3.滴滴出行数据ETL处理 * 4.保存转换后数据至Hudi表 * 5.应用结束关闭资源 */ object DidiStorageSpark { val datasPath: String = "file:////Users/FengZhen/Desktop/accumulate/0_project/hudi-learning/datas/didi/dwv_order_make_haikou_1.txt" val hudiTableName: String = "tbl_didi_haikou" val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou" def main(args: Array[String]): Unit = { //1.构建SparkSession实例对象(集成Hudi和HDFS) val spark: SparkSession = SparkUtils.createSparkSession(this.getClass) //2.加载本地CSV文件格式弟弟出行数据 val didiDF = readCsvFile(spark, datasPath) // didiDF.printSchema() // didiDF.show(10, false) //3.滴滴出行数据ETL处理 val etlDF: DataFrame = process(didiDF) // etlDF.printSchema() // etlDF.show(10, false) //4.保存转换后数据至Hudi表 saveToHudi(etlDF, hudiTableName, hudiTablePath) //5.应用结束关闭资源 spark.stop() } /** * 读取CSV格式文本文件数据,封装到DataFrame中 * @param spark * @param path * @return */ def readCsvFile(spark: SparkSession, path: String): DataFrame = { spark.read //设置分隔符为制表符 .option("sep", "\\t") //文件首行为列名称 .option("header", "true") //依据数值自动推断数据类型 .option("inferSchema", "true") //指定文件路径 .csv(path) } /** * 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath * @param dataFrame * @return */ def process(dataFrame: DataFrame): DataFrame = { dataFrame //添加字段,就是hudi表分区字段,三级分区 -> yyyy-MM-dd .withColumn( "partitionpath", concat_ws("-", col("year"), col("month"), col("day")) ) //删除列 .drop("year", "month", "day") //添加timestamp列,作为hudi标记录数据预合并字段,使用发车时间 .withColumn( "ts", unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss") ) } /** * 将数据集DataFrame保存至hudi表中,表的类型为COW,属于批量保存数据,写少读多 * @param dataFrame * @param table * @param path */ def saveToHudi(dataFrame: DataFrame, table: String, path: String) = { import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ //保存数据 dataFrame.write .mode(SaveMode.Overwrite) .format("hudi") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") //hudi表的属性值的设置 //主键 .option(RECORDKEY_FIELD.key(), "order_id") //预合并 .option(PRECOMBINE_FIELD.key(), "ts") //分区 .option(PARTITIONPATH_FIELD.key(), "partitionpath") //表名 .option(TBL_NAME.key(), table) .save(path) } }
业务分析
package com.zhen.hudi.didi import java.util.{Calendar, Date} import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.functions._ /** * @Author FengZhen * @Date 3/1/22 9:31 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,加载hudi表数据,按照业务需求统计 * Spark DataSource数据源接口方式 * spark.read.format("hudi").. * dataframe.write.format("hudi") * * Spark2.x开始,程序入口SparkSession */ object DidiAnalysisSpark { //hudi表属性,存储数据HDFS路径 val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou" def main(args: Array[String]): Unit = { //1.构建SparkSession实例对象(集成Hudi和HDFS) val spark:SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8) //2.加载hudi表的数据,指定字段 val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath) hudiDF.printSchema() hudiDF.show(10, false) //由于数据被使用多次,所以建议缓存 hudiDF.persist(StorageLevel.MEMORY_AND_DISK) //3.按照业务指标进行统计分析 //指标1:订单类型(product_id)统计 //reportProduct(hudiDF) //指标2:订单时效统计 //reportType(hudiDF) //指标3:交通类型统计 //reportTraffic(hudiDF) //指标4:订单价格统计 //reportPrice(hudiDF) //指标5:订单距离统计 //reportDistance(hudiDF) //指标6:日期类型 -> 星期,进行统计 reportWeek(hudiDF) //当数据不再使用时,释放资源 hudiDF.unpersist() //4.应用结束,关闭资源 spark.stop() } /** * 加载hudi表数据,封装到dataframe中 * @param spark * @param path * @return */ def readFromHudi(spark: SparkSession, path: String): DataFrame = { val didiDF: DataFrame = spark.read.format("hudi").load(path) //选择字段 didiDF.select( "product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time" ) } /** * 订单类型统计,字段:product_id * @param dataFrame */ def reportProduct(dataFrame: DataFrame): Unit = { //1滴滴专车,2滴滴企业专车,3滴滴快车,4滴滴企业快车 //a.按照产品线ID分组统计即可 val reportDF: DataFrame = dataFrame.groupBy("product_id").count() //b.自定义UDF函数 val to_name = udf( (productId: Int) => { productId match { case 1 => "滴滴专车" case 2 => "滴滴企业专车" case 3 => "滴滴快车" case 4 => "滴滴企业快车" } } ) //c.转换名称 val resultDF: DataFrame = reportDF.select( to_name(col("product_id")).as("order_type"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 订单时效性统计,字段:type * @param dataFrame */ def reportType(dataFrame: DataFrame): Unit = { //a.按照时效性id分组统计即可 val reportDF: DataFrame = dataFrame.groupBy("type").count() //b.自定义UDF函数 val to_name = udf( (realtimeType: Int) => { realtimeType match { case 0 => "实时" case 1 => "预约" } } ) //c.转换名称 val resultDF: DataFrame = reportDF.select( to_name(col("type")).as("order_realtime"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 根据交通类型统计,字段:traffic_type * @param dataFrame */ def reportTraffic(dataFrame: DataFrame): Unit = { //a.按照交通类型分组统计即可 val reportDF: DataFrame = dataFrame.groupBy("traffic_type").count() //b.自定义UDF函数 val to_name = udf( (realtimeType: Int) => { realtimeType match { case 0 => "普通散客" case 1 => "企业时租" case 2 => "企业接机套餐" case 3 => "企业送机套餐" case 4 => "拼车" case 5 => "接机" case 6 => "送机" case 302 => "跨域拼车" case _ => "未知" } } ) //c.转换名称 val resultDF: DataFrame = reportDF.select( to_name(col("traffic_type")).as("traffic_type"), col("count").as("total") ) resultDF.printSchema() resultDF.show(10, false) } /** * 订单价格统计,先将订单价格划分阶段,再统计各个阶段的数目,字段:pre_total_fee * @param dataFrame * @return */ def reportPrice(dataFrame: DataFrame): Unit = { val resultDF: DataFrame = dataFrame .agg( //价格0-15 sum( when(col("pre_total_fee").between(0, 15), 1).otherwise(0) ).as("0-15"), //价格16-30 sum( when(col("pre_total_fee").between(16, 30), 1).otherwise(0) ).as("16-30"), //价格31-50 sum( when(col("pre_total_fee").between(31, 50), 1).otherwise(0) ).as("31-50"), //价格51-100 sum( when(col("pre_total_fee").between(51, 100), 1).otherwise(0) ).as("51-100"), //价格100+ sum( when(col("pre_total_fee").gt(100), 1).otherwise(0) ).as("100+"), ) resultDF.printSchema() resultDF.show(10, false) } /** * 订单距离统计,先将订单距离划分阶段,再统计各个阶段的数目,字段:start_dest_distance * @param dataFrame * @return */ def reportDistance(dataFrame: DataFrame): Unit = { val resultDF: DataFrame = dataFrame .agg( //距离0-10km sum( when(col("start_dest_distance").between(0, 10000), 1).otherwise(0) ).as("0-10km"), //距离10-20km sum( when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0) ).as("10-20km"), //距离20-30 sum( when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0) ).as("20-30km"), //距离30-50 sum( when(col("pre_total_fee").between(30001, 50001), 1).otherwise(0) ).as("30-50km"), //距离50+ sum( when(col("pre_total_fee").gt(500001), 1).otherwise(0) ).as("50km+"), ) resultDF.printSchema() resultDF.show(10, false) } /** * 交易完成时间转换为星期,根据星期统计,字段:departure_time * @param dataFrame */ def reportWeek(dataFrame: DataFrame): Unit = { //b.自定义UDF函数 val to_week = udf( (departureTime: String) => { val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd") val calendar: Calendar = Calendar.getInstance() val date: Date = format.parse(departureTime) calendar.setTime(date) val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match{ case 1 => "星期日" case 2 => "星期一" case 3 => "星期二" case 4 => "星期三" case 5 => "星期四" case 6 => "星期五" case 7 => "星期六" } dayWeek } ) val resultDF: DataFrame = dataFrame .select( to_week(col("departure_time")).as("week") ) .groupBy(col("week")).count() .select( col("week"), col("count").as("total") ) .orderBy(col("total").desc) resultDF.printSchema() resultDF.show(10, false) } }
标签:Hudi,val,滴滴,DataFrame,col,dataFrame,hudi,spark,Spark 来源: https://www.cnblogs.com/EnzoDin/p/15957357.html