RDD&Dataset&DataFrame
作者:互联网
Dataset创建
object DatasetCreation { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkSessionTest") .getOrCreate() import spark.implicits._ //1: range val ds1 = spark.range(0, 10, 2, 2) ds1.show() val dogs = Seq(Dog("jitty", "red"), Dog("mytty", "yellow")) val cats = Seq(new Cat("jitty", 2), new Cat("mytty", 4)) //2: 从Seq[T]中创建 val data = dogs val ds = spark.createDataset(data) ds.show() //3: 从RDD[T]中创建 val dogRDD = spark.sparkContext.parallelize(dogs) val dogDS = spark.createDataset(dogRDD) dogDS.show() val catRDD = spark.sparkContext.parallelize(cats) //val catDSWithoutEncoder = spark.createDataset(catRDD) val catDS = spark.createDataset(catRDD)(Encoders.bean(classOf[Cat])) catDS.show() //Encoders 负责JVM对象类型与spark SQL内部数据类型之间的转换 val intDs = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder) val seqIntDs = Seq(Seq(1), Seq(2), Seq(3)).toDS() // implicitly provided (spark.implicits.newIntSeqEncoder) val arrayIntDs = Seq(Array(1), Array(2), Array(3)).toDS() // implicitly provided (spark.implicits.newIntArrayEncoder) //支持的Encoders有如下: Encoders.product //tuples and case classes Encoders.scalaBoolean Encoders.scalaByte Encoders.scalaDouble Encoders.scalaFloat Encoders.scalaInt Encoders.scalaLong Encoders.scalaShort Encoders.bean(classOf[Cat]) spark.stop() } }
DataFrame创建
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object DataFrameCreation { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkSessionTest") .getOrCreate() //1: 从RDD[A <: Product]中创建, case class 和 tuple都是Product的子类 val rdd = spark.sparkContext.textFile("").map(line => { val splitData = line.split(",") Dog(splitData(0), splitData(1)) }) val tupleRDD = spark.sparkContext.parallelize(Seq(("jitty", 2), ("mytty", 4))) spark.createDataFrame(rdd) spark.createDataFrame(tupleRDD) val dogRDD = spark.sparkContext.parallelize(Seq(Dog("jitty", "red"), Dog("mytty", "yellow"))) val dogDf = spark.createDataFrame(dogRDD) dogDf.show() //2: 从Seq[A <: Product]中创建 val dogSeq = Seq(Dog("jitty", "red"), Dog("mytty", "yellow")) spark.createDataFrame(dogSeq).show() //3:用RDD[_] + class创建,这个class是java的bean val catRDD = spark.sparkContext.parallelize(Seq(new Cat("jitty", 2), new Cat("mytty", 4))) //val catDf = spark.createDataFrame(catRDD) val catDf = spark.createDataFrame(catRDD, classOf[Cat]) catDf.show() catDf.createOrReplaceTempView("cat") spark.sql("select * from cat").show() //需要注意的是查询出来的cat的属性的顺序是不固定的 //4: 用RDD[Row] + schema创建 val rowSeq = Seq("tom,30", "katy, 46").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt)) val rowRDD = spark.sparkContext.parallelize(rowSeq) val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, true) :: Nil) val dataFrame = spark.createDataFrame(rowRDD, schema) dataFrame.printSchema dataFrame.show() //5: 从外部数据源中创建 val df = spark.read.json(s"${BASE_PATH}/IoT_device_info.json") df.show() spark.stop() } }
RDD&Dataset&DataFrame的转换
package com.twq.dataset.creation import com.twq.dataset.Dog import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} object RDDDatasetTransform { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("RDDDatasetTransform") .getOrCreate() val dogs = Seq(Dog("jitty", "red"), Dog("mytty", "yellow")) val dogRDD = spark.sparkContext.parallelize(dogs) //1: RDD转DataFrame import spark.implicits._ val dogDF = dogRDD.toDF() dogDF.show() val renameSchemaDF = dogRDD.toDF ("first_name", "lovest_color") renameSchemaDF.show() //2: DataFrame转RDD, schema信息丢掉了 val dogRowRDD: RDD[Row] = dogDF.rdd dogRowRDD.collect() renameSchemaDF.rdd.collect() //3: RDD转Dataset val dogDS = dogRDD.toDS() dogDS.show() //4: Dataset转RDD val dogRDDFromDs: RDD[Dog] = dogDS.rdd dogRDDFromDs.collect() //5: DataFrame转Dataset val dogDsFromDf = dogDF.as[Dog] dogDsFromDf.show() //6: Dataset转DataFrame val dogDfFromDs = dogDsFromDf.toDF() dogDfFromDs.show() spark.stop() } }
schema的定义以及复杂数据类型的用法
import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SaveMode, SparkSession} object SchemaApiTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SchemaApiTest") .master("local") .getOrCreate() val iotDeviceDf = spark.read.json(s"${BASE_PATH}/IoT_device_info.json") iotDeviceDf.toString() //1: schema的展示 iotDeviceDf.schema iotDeviceDf.printSchema() //2: schema中可以有复杂数据类型 val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, true) :: StructField("map", MapType(StringType, StringType), true) :: StructField("array", ArrayType(StringType), true) :: StructField("struct", StructType(Seq(StructField("field1", StringType), StructField("field2", StringType)))) :: Nil) val people = spark.sparkContext.parallelize(Seq("tom,30", "katy, 46")).map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt, Map(p(0) -> p(1)), Seq(p(0), p(1)), Row("value1", "value2"))) val dataFrame = spark.createDataFrame(people, schema) dataFrame.printSchema dataFrame.show() dataFrame.select("map").collect().map(row => row.getAs[Map[String, String]]("map")) dataFrame.select("array").collect().map(row => row.getAs[Seq[String]]("array")) dataFrame.select("struct").collect().map(row => row.getAs[Row]("struct")) //schema 的用处 val exampleSchema = new StructType().add("name", StringType).add("age", IntegerType) exampleSchema("name") ///提取name信息,类型 exampleSchema.fields //所有字段类型信息 exampleSchema.fieldNames// 所有字段名字 exampleSchema.fieldIndex("name")/// 字段索引位置 //1:查看一个parquet文件的schema val sessionDf = spark.read.parquet(s"${BASE_PATH}/trackerSession") sessionDf.schema sessionDf.printSchema() //2:比对两个parquet文件的schema是否相同 val changedSchemaFieldNames = sessionDf.schema.fieldNames.map(fieldName => { if (fieldName == "pageview_count") { "pv_count" } else fieldName }) sessionDf.toDF(changedSchemaFieldNames:_*).write.mode(SaveMode.Overwrite).parquet(s"${BASE_PATH}/trackerSession_changeSchema") val schemaChangeSessionDf = spark.read.parquet(s"${BASE_PATH}/trackerSession_changeSchema") schemaChangeSessionDf.schema schemaChangeSessionDf.printSchema() val oldSchema = sessionDf.schema val changeSchema = schemaChangeSessionDf.schema oldSchema == changeSchema //false //3:两个parquet文件的schema不一样,需要进行统一 val allSessionError = spark.read.parquet(s"${BASE_PATH}/trackerSession", s"${BASE_PATH}/trackerSession_changeSchema") allSessionError.printSchema() allSessionError.show() val allSessionRight = sessionDf.toDF(changeSchema.fieldNames:_*).union(schemaChangeSessionDf) allSessionRight.printSchema() allSessionRight.show() spark.stop() } }
标签:Encoders,Seq,val,show,DataFrame,Dataset,RDD,spark,schema 来源: https://www.cnblogs.com/tesla-turing/p/11489035.html