spark学习进度22(column对象、缺省值处理)
作者:互联网
column对象:
分类 | 操作 | 解释 |
---|---|---|
创建 |
|
单引号
|
|
同理,
|
|
|
|
|
|
|
|
|
前面的
|
|
|
可以通过
|
|
别名和转换 |
|
|
|
通过
|
|
添加列 |
|
通过
|
操作 |
|
通过
|
|
通过
|
|
|
在排序的时候, 可以通过
|
// 1. 创建 spark 对象 val spark = SparkSession.builder() .master("local[6]") .appName("column") .getOrCreate() import spark.implicits._ @Test def creation(): Unit = { val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS() val ds1: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS() val df: DataFrame = Seq(("zhangsan", 15), ("lisi", 10)).toDF("name", "age") // 2. ' 必须导入spark的隐式转换才能使用 str.intern() val column: Symbol = 'name // 3. $ 必须导入spark的隐式转换才能使用 val column1: ColumnName = $"name" // 4. col 必须导入 functions import org.apache.spark.sql.functions._ val column2: sql.Column = col("name") // 5. column 必须导入 functions val column3: sql.Column = column("name") // 这四种创建方式, 有关联的 Dataset 吗? ds.select(column).show() // Dataset 可以, DataFrame 可以使用 Column 对象选中行吗? df.select(column).show() // select 方法可以使用 column 对象来选中某个列, 那么其他的算子行吗? df.where(column === "zhangsan").show() // column 有几个创建方式, 四种 // column 对象可以用作于 Dataset 和 DataFrame 中 // column 可以和命令式的弱类型的 API 配合使用 select where // 6. dataset.col // 使用 dataset 来获取 column 对象, 会和某个 Dataset 进行绑定, 在逻辑计划中, 就会有不同的表现 val column4: sql.Column = ds.col("name") val column5: sql.Column = ds1.col("name") // 这会报错 // ds.select(column5).show() // 为什么要和 dataset 来绑定呢? // ds.join(ds1, ds.col("name") === ds1.col("name")) // 7. dataset.apply val column6: sql.Column = ds.apply("name") val column7: sql.Column = ds("name") }
@Test def as(): Unit = { val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS() // select name, count(age) as age from table group by name ds.select('name as "new_name").show() ds.select('age.as[Long]).show() }
@Test def api(): Unit = { val ds: Dataset[Person] = Seq(Person("zhangsan", 15), Person("lisi", 10)).toDS() // 需求一, ds 增加列, 双倍年龄 // 'age * 2 其实本质上就是将一个表达式(逻辑计划表达式) 附着到 column 对象上 // 表达式在执行的时候对应每一条数据进行操作 ds.withColumn("doubled", 'age * 2).show() // 需求二, 模糊查询 // select * from table where name like zhang% ds.where('name like "zhang%").show() // 需求三, 排序, 正反序 ds.sort('age asc).show() // 需求四, 枚举判断 ds.where('name isin ("zhangsan", "wangwu", "zhaoliu")).show() }
缺省值处理:
缺失值的处理思路
如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值
什么是缺失值
-
一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说
null
, 比如说空字符串关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响
缺失值如何产生的
-
Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据
例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改
MySQL
表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事.
缺失值的类型
-
常见的缺失值有两种
-
null
,NaN
等特殊类型的值, 某些语言中null
可以理解是一个对象, 但是代表没有对象,NaN
是一个数字, 可以代表不是数字针对这一类的缺失值,
Spark
提供了一个名为DataFrameNaFunctions
特殊类型来操作和处理 -
"Null"
,"NA"
," "
等解析为字符串的类型, 但是其实并不是常规字符串数据针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破
DataFrameNaFunctions
当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值,
DataFrameNaFunctions
中包含一系列针对空值数据的方案-
DataFrameNaFunctions.drop
可以在当某行中包含null
或NaN
的时候丢弃此行 -
DataFrameNaFunctions.fill
可以在将null
和NaN
充为其它值 -
DataFrameNaFunctions.replace
可以把null
或NaN
替换为其它值, 但是和fill
略有一些不同, 这个方法针对值来进行替换 -
// 1. 创建 SparkSession val spark = SparkSession.builder() .master("local[6]") .appName("null processor") .getOrCreate() @Test def nullAndNaN(): Unit = { // 2. 导入数据集 // 3. 读取数据集 // 1. 通过Saprk-csv自动的推断类型来读取, 推断数字的时候会将 NaN 推断为 字符串 // spark.read // .option("header", true) // .option("inferSchema", true)//这里是推断 // .csv(...) // 2. 直接读取字符串, 在后续的操作中使用 map 算子转类型 // spark.read.csv().map( row => row... ) // 3. 指定 Schema, 不要自动推断 val schema = StructType( List( StructField("id", LongType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) val sourceDF = spark.read .option("header", value = true) .schema(schema) .csv("dataset/beijingpm_with_nan.csv") sourceDF.show() // 4. 丢弃 // 2019, 12, 12, NaN // 规则: // 1. any, 只有有一个 NaN 就丢弃 sourceDF.na.drop("any").show() sourceDF.na.drop().show() // 2. all, 所有数据都是 NaN 的行才丢弃 sourceDF.na.drop("all").show() // 3. 某些列的规则 sourceDF.na.drop("any", List("year", "month", "day", "hour")).show() // 5. 填充 // 规则: // 1. 针对所有列数据进行默认值填充 sourceDF.na.fill(0).show() // 2. 针对特定列填充 sourceDF.na.fill(0, List("year", "month")).show() }
遇到字符串的时候:
@Test def strProcessor(): Unit = { // 读取数据集 val sourceDF = spark.read .option("header", value = true) .option("inferSchema", value = true) .csv("dataset/BeijingPM20100101_20151231.csv") // sourceDF.show() // 1. 丢弃 import spark.implicits._ // sourceDF.where('PM_Dongsi =!= "NA").show() // 2. 替换 import org.apache.spark.sql.functions._ // select name, age, case // when ... then ... // when ... then ... // else sourceDF.select( 'No as "id", 'year, 'month, 'day, 'hour, 'season, when('PM_Dongsi === "NA", Double.NaN)//当啥的时候进行替换 .otherwise('PM_Dongsi cast DoubleType)//转换类型 .as("pm") ).show() // 原类型和转换过后的类型, 必须一致 sourceDF.na.replace("PM_Dongsi", Map("NA" -> "NaN", "NULL" -> "null")).show() //指定哪一行进行怎样的替换 }
-
-
标签:22,val,show,column,Person,缺省值,spark,name 来源: https://www.cnblogs.com/dazhi151/p/14274848.html