spark_subject合集
作者:互联网
spark subject
subject_1:lag函数的使用场景(灵活)
公司代码,年度,1月-------------------------12月的收入金额
burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12
853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990
853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991
853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992
853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993
853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994
853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995
853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996
853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997
853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998
853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999
846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000
846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001
846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573
846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671
846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769
846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866
846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964
846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062
846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159
846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257
1、统计每个公司每年按月累计收入 行转列 --> sum窗口函数
输出结果
公司代码,年度,月份,当月收入,累计收入
2、统计每个公司当月比上年同期增长率 行转列 --> lag窗口函数
公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
answer
package com.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Demo8Burks {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local")
.appName("burk")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
//读取数据
val burkDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("burk STRING,year STRING,tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE,tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE,tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE")
.load("data/burks.txt")
burkDF.show()
/**
* 1、统计每个公司每年按月累计收入 行转列 --> sum窗口函数
* 输出结果
* 公司代码,年度,月份,当月收入,累计收入
*/
/**
* 我们先用sql写
*
*/
//先创建视图,才能写sql
burkDF.createOrReplaceTempView("burks")
spark.sql(
"""
|
|select
|burk,year,month,plc,sum(plc) over(partition by burk,year order by month) as leiji
|from
|(
|select burk,year,month,plc
|from burks
|lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,plc
|) t1
|
|
|
|""".stripMargin).show()
val m: Column = map(
expr("1"), $"tsl01",
expr("2"), $"tsl02",
expr("3"), $"tsl03",
expr("4"), $"tsl04",
expr("5"), $"tsl05",
expr("6"), $"tsl06",
expr("7"), $"tsl07",
expr("8"), $"tsl08",
expr("9"), $"tsl09",
expr("10"), $"tsl10",
expr("11"), $"tsl11",
expr("12"), $"tsl12"
)
/**
* 采用burkDF的写法
*/
burkDF
//先行专列
.select($"burk",$"year",explode(m) as Array("month","plc"))
//加上orderBy可以依此累加
.withColumn("leiJi",sum($"plc") over Window.partitionBy($"burk",$"year").orderBy($"month"))
.show()
/**
* 2、统计每个公司当月比上年同期增长率 行转列 --> lag窗口函数
* 公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
*
* coalesce: 返回第一个部位null的列
*
*/
burkDF
//先行行转列
.select($"burk",$"year",explode(m) as Array("month","plc"))
//取上年同期的收入
.withColumn("shangTong",lag($"plc",1) over Window.partitionBy($"burk",$"month").orderBy($"year"))
//计算增长率
.withColumn("p",round($"shangTong" / $"plc" -1,5))
.withColumn("p",when($"shangTong".isNotNull,$"p").otherwise(1.0))
.select($"burk",$"year",$"month",$"p")
.show()
}
}
subject_2:行列转换
1、行列转换
表1
姓名,科目,分数
name,item,score
张三,数学,33
张三,英语,77
李四,数学,66
李四,英语,78
表2
姓名,数学,英语
name,math,english
张三,33,77
李四,66,78
1、将表1转化成表2
2、将表2转化成表1
answer
package com.sql
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Demo11Student {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local")
.appName("stu_sco")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
//读取数据
val stuDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("name STRING,subject STRING, score DOUBLE")
.load("data/stu_sco.txt")
/**
* 将列转换成行
*/
val hang_lieDF: DataFrame = stuDF
.groupBy($"name")
.agg(
sum(when($"subject" === "数学", $"score").otherwise(0)) as "math",
sum(when($"subject" === "英语", $"score").otherwise(0)) as "english"
)
hang_lieDF.show()
/**
* 将行转换成列
*
*/
val m: Column = map(
expr("'数学'"), $"math",
expr("'英语'"), $"english"
)
hang_lieDF
.select($"name",explode(m) as Array("subject","score"))
.show()
}
}
subject_3:时间上的聚合
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-06-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-07-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-08-01 00:00:00
91330000733796106P,阿里云计算有限公司,2020-09-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
91330000733796106P,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-03-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-04-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-05-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-06-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-07-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-08-01 00:00:00
aaaaaaaaaaaaaaaaaa,阿里云计算有限公司,2020-09-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-10-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-11-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2020-12-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-01-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-02-01 00:00:00
aaaaaaaaaaaaaaaaaa,杭州海康威视数字技术股份有限公司,2021-03-01 00:00:00
answer
package com.sql
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.language.postfixOps
object Demo12SheBao {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local")
.appName("sheBao")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
//读取数据
val sheBaoDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("id STRING, burk STRING, sdate STRING")
.load("data/sheBao.txt")
sheBaoDF.show()
/**
* 在时间线上聚类
*
*/
sheBaoDF
//按照id进行分组按照时间进行排序,取出上一条数据
.withColumn("last_name",lag($"burk",1) over Window.partitionBy($"id").orderBy($"sdate"))
//在取出一列,当前一家公司和本家公司名称一样就为0否则就为1,当为1时就表示新入职
.withColumn("flag",when($"burk" === $"last_name",0).otherwise(1))
//将最后一条数据按照时间线上聚合(排序),则会以此累计相加,可以得到分组
.withColumn("fenZu",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))
//按照 fenzu 字段取出入职时间和离职时间
.groupBy($"id", $"burk", $"fenZu")
.agg(min($"sdate") as "start_date", max($"sdate") as "end_date")
.show(1000)
}
}
标签:00,01,威视,海康,2020,spark,合集,subject 来源: https://www.cnblogs.com/atao-BigData/p/16488811.html