其他分享
首页 > 其他分享> > 基于spark2的dataFrame和dataSet

基于spark2的dataFrame和dataSet

作者:互联网


文章目录


dataFrame

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SQLTest01")
      .master("local[*]")
      .getOrCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

dataFrame wordCount

package sql2

import org.apache.spark.sql.SparkSession

object Spark2WoldCount {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Spark2WoldCount")
      .master("local[*]")
      .getOrCreate()
    val lines = spark.read.textFile("")

    //整理数据,压平.导入隐式转换
    import spark.implicits._
    val words = lines.flatMap(_.split(" "))

    //注册视图
    words.createTempView("v_wc")

    //执行sql
    val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC")
    result.show()
    spark.stop()
  }
}

基于dataSet的wordCount

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SQLTest01")
      .master("local[*]")
      .getOrCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

               

标签:val,spark2,dataFrame,dataSet,session,apache,import,spark,id
来源: https://blog.51cto.com/u_13985831/2836508