其他分享
首页 > 其他分享> > spark_subject合集

spark_subject合集

作者:互联网

spark subject

subject_1:lag函数的使用场景(灵活)

公司代码,年度,1月-------------------------12月的收入金额
burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12
853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990
853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991
853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992
853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993
853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994
853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995
853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996
853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997
853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998
853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999
846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000
846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001
846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573
846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671
846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769
846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866
846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964
846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062
846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159
846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257


1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数

输出结果
公司代码,年度,月份,当月收入,累计收入


2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)

answer

package com.sql

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Demo8Burks {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("burk")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    //读取数据
    val burkDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("burk STRING,year STRING,tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE,tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE,tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")
      .load("data/burks.txt")

    burkDF.show()

    /**
     * 1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数
     * 输出结果
     * 公司代码,年度,月份,当月收入,累计收入
     */
    /**
     * 我们先用sql写
     *
     */

    //先创建视图,才能写sql
    burkDF.createOrReplaceTempView("burks")

    spark.sql(
      """
        |
        |select
        |burk,year,month,plc,sum(plc) over(partition by burk,year order by month) as leiji
        |from
        |(
        |select burk,year,month,plc
        |from burks
        |lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,plc
        |) t1
        |
        |
        |
        |""".stripMargin).show()



    val m: Column = map(
      expr("1"), $"tsl01",
      expr("2"), $"tsl02",
      expr("3"), $"tsl03",
      expr("4"), $"tsl04",
      expr("5"), $"tsl05",
      expr("6"), $"tsl06",
      expr("7"), $"tsl07",
      expr("8"), $"tsl08",
      expr("9"), $"tsl09",
      expr("10"), $"tsl10",
      expr("11"), $"tsl11",
      expr("12"), $"tsl12"
    )

    /**
     * 采用burkDF的写法
     */
    burkDF
      //先行专列
      .select($"burk",$"year",explode(m) as Array("month","plc"))
      //加上orderBy可以依此累加
      .withColumn("leiJi",sum($"plc") over Window.partitionBy($"burk",$"year").orderBy($"month"))
      .show()

    /**
     * 2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
     * 公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
     *
     * coalesce: 返回第一个部位null的列
     *
     */
    burkDF
      //先行行转列
      .select($"burk",$"year",explode(m) as Array("month","plc"))
      //取上年同期的收入
      .withColumn("shangTong",lag($"plc",1) over Window.partitionBy($"burk",$"month").orderBy($"year"))
      //计算增长率
      .withColumn("p",round($"shangTong" / $"plc" -1,5))
      .withColumn("p",when($"shangTong".isNotNull,$"p").otherwise(1.0))
      .select($"burk",$"year",$"month",$"p")
      .show()
  }
}

subject_2:行列转换

1、行列转换

表1
姓名,科目,分数
name,item,score
张三,数学,33
张三,英语,77
李四,数学,66
李四,英语,78


表2
姓名,数学,英语
name,math,english
张三,33,77
李四,66,78

    1、将表1转化成表2
    2、将表2转化成表1

answer

package com.sql
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Demo11Student {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("stu_sco")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    //读取数据
    val stuDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("name STRING,subject STRING, score DOUBLE")
      .load("data/stu_sco.txt")

    /**
     * 将列转换成行
     */

    val hang_lieDF: DataFrame = stuDF
      .groupBy($"name")
      .agg(
        sum(when($"subject" === "数学", $"score").otherwise(0)) as "math",
        sum(when($"subject" === "英语", $"score").otherwise(0)) as "english"
      )
    hang_lieDF.show()

    /**
     * 将行转换成列
     *
     */
    val m: Column = map(
      expr("'数学'"), $"math",
      expr("'英语'"), $"english"
    )

    hang_lieDF
      .select($"name",explode(m) as Array("subject","score"))
      .show()
  }
}

subject_3:时间上的聚合

91330000733796106P,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-06-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-07-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-08-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-09-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-06-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-07-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-08-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-09-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00

answer

package com.sql

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.language.postfixOps

object Demo12SheBao {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("local")
      .appName("sheBao")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

      //读取数据
      val sheBaoDF: DataFrame = spark
        .read
        .format("csv")
        .option("sep", ",")
        .schema("id STRING, burk STRING, sdate STRING")
        .load("data/sheBao.txt")

    sheBaoDF.show()

    /**
     * 在时间线上聚类
     *
     */
    sheBaoDF
      //按照id进行分组按照时间进行排序,取出上一条数据
      .withColumn("last_name",lag($"burk",1) over Window.partitionBy($"id").orderBy($"sdate"))
      //在取出一列,当前一家公司和本家公司名称一样就为0否则就为1,当为1时就表示新入职
      .withColumn("flag",when($"burk" === $"last_name",0).otherwise(1))
      //将最后一条数据按照时间线上聚合(排序),则会以此累计相加,可以得到分组
      .withColumn("fenZu",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))
      //按照 fenzu 字段取出入职时间和离职时间
      .groupBy($"id", $"burk", $"fenZu")
      .agg(min($"sdate") as "start_date", max($"sdate") as "end_date")
      .show(1000)
  }
}

标签:00,01,威视,海康,2020,spark,合集,subject
来源: https://www.cnblogs.com/atao-BigData/p/16488811.html