基于spark2的dataFrame和dataSet
作者:互联网
文章目录
dataFrame
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SQLTest01") .master("local[*]") .getOrCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
dataFrame wordCount
package sql2 import org.apache.spark.sql.SparkSession object Spark2WoldCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Spark2WoldCount") .master("local[*]") .getOrCreate() val lines = spark.read.textFile("") //整理数据,压平.导入隐式转换 import spark.implicits._ val words = lines.flatMap(_.split(" ")) //注册视图 words.createTempView("v_wc") //执行sql val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC") result.show() spark.stop() } }
基于dataSet的wordCount
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SQLTest01") .master("local[*]") .getOrCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
标签:val,spark2,dataFrame,dataSet,session,apache,import,spark,id 来源: https://blog.51cto.com/u_13985831/2836508