SparkSQL 创建空dataframe
作者:互联网
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.DateType
object APPUser {
def main(args:Array[String]):Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val spark = SparkSession
.builder()
.appName("SparkSessionAppUser")
.master("local[2]")
.getOrCreate()
val linesRDD = spark
.sparkContext.textFile("./data/user_app.txt")
.repartition(1)
val rowsRDD = linesRDD
.map{row => row.split(",")}
.map{cols =>
Row(cols(0),cols(1),cols(2).trim.toInt,cols(3).trim.toDouble,cols(4))}
val schema = StructType(List(
StructField("serv_number",StringType,false),
StructField("unf_app_code",StringType,false),
StructField("click_cnt",IntegerType,false),
StructField("access_time",DoubleType,false),
StructField("statis_date",StringType,false)))
val user_appDF = spark.createDataFrame(rowsRDD,schema)
val user_app_indexDF = user_appDF
.withColumn("id",monotonically_increasing_id + 1)
user_app_indexDF.createOrReplaceTempView("user_app")
//user_app 日期类型转换并注册视图
val user_app_index_dateDF = spark.sql("select id,serv_number,unf_app_code,click_cnt,access_time," +
"to_date(statis_date,'yyyyMMdd') as static_date from user_app")
user_app_index_dateDF.createOrReplaceTempView("user_app")
val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)
//spark.sql("select current_timestamp,date_format('2016-04-08', 'y'), date_add('2016-07-30', 1),date_sub('2016-07-30', 1),datediff('2009-07-31', '2009-07-30')").show()
spark.stop()
}
}
标签:val,创建,app,dataframe,SparkSQL,apache,org,spark,user 来源: https://www.cnblogs.com/songyuejie/p/15673435.html