数据库
首页 > 数据库> > spark sql 总结

spark sql 总结

作者:互联网

一.概述

1.前世今生

2.简介

3.操作方式

4.特点

5.SparkSql愿景

二.相关名词解释

三.shell 操作sparksql

四.DataFrames操作sparksql

1.项目创建

首先根据模板创建一个scala项目
模板:

group:net.alchim31.maven
artifact: scala-archetype-simple
version: 1.7
repository:https://maven.aliyun.com/repository/central

2.配置项目

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.11</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spec2.version>4.2.0</spec2.version>
  </properties>    

	<!--scala依赖-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!--sparkcore依赖 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>

    <!--sparksql依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>

    <!--log4j-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>2.14.1</version>
    </dependency>

3.代码编写

3.1DataFrames1.6

package com.antg.main

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object DataFrames1_6 {
  def main(args: Array[String]): Unit = {
    //创建sparkconf
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("测试DF1.6")
    //创建上下文环境
    val sc = new SparkContext(conf)
    //创建sql上下文
    val sqlContext = new SQLContext(sc)
    //读取数据
    val df = sqlContext.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
    //显示全部信息
    df.show()
    //关闭上下文
    sc.stop()
  }
}

3.2Dataframes2.3

package com.antg.main

import org.apache.spark.sql.SparkSession

object DataFrames2_3 {
  def main(args: Array[String]): Unit = {
    //创建session
    val sparkSession =  SparkSession.builder()
      .master("local[*]")
      .appName("dataframes2.3")
      .getOrCreate()
    //创建df
    val df = sparkSession.read.json("C:\\Users\\Administrator\\Desktop\\data.json")

    //虚表
    val vrTable = df.createTempView("vrTable")

    sparkSession.sql("select * from vrTable").show()

    //数据持久化
    df.repartition(2).write.format("parquet").save("./data")

    //关闭
    sparkSession.stop()

  }
}

3.3rdd转换成df

package com.antg.main

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object RDD_DF {
  def main(args: Array[String]): Unit = {
    var sparkSession = SparkSession.builder()
      .appName("test_rdd to df")
      .master("local[*]")
      .getOrCreate()

    var scheme = StructType(
      "stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
    )
    var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\student_mysql.txt")
    var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
    var df = sparkSession.createDataFrame(rowRDD,scheme)
    df.show()
    df.printSchema()
    sparkSession.stop()
  }
}

五.parquet数据格式

六.DataSet 操作sparksql

package com.antg.main

import org.apache.spark.sql.SparkSession
case class Student(name:String,age:BigInt)
object TestDS {
  def main(args: Array[String]): Unit = {
    //创建Session
    val sparkSession = SparkSession.builder()
      .appName("ds test")
      .master("local[*]")
      .getOrCreate()
    //引入自动隐式转换
    import sparkSession.implicits._

    //使用基础数据类型创建DataSet
    val a = Seq(1,2,3).toDS()

    //使用DataSet
    a.map(_+1).collect.foreach(println)
    a.show()

    //使用样例类创建DS
    val b = Seq(Student("tom",22)).toDS()
    b.show()

    //通过导入文件创建,并使用样例类指定DS的格式
    val path = "C:\\Users\\Administrator\\Desktop\\student_data.txt"
    val c = sparkSession.read.json(path).as[Student]
    c.show()
    //由于是强类型,所以这里可以很方便的操作ds中的内容
    c.foreach(x=>println(x.age))
  }
}

student_data.txt

{"name":"张一","age":10,"address":"国际庄"}
{"name":"张二","age":20}
{"name":"张三","age":30}
{"name":"张四","age":40}

七.各个数据集的对比分析

标签:总结,val,df,sparkSession,sql,spark,数据
来源: https://blog.csdn.net/weixin_44745147/article/details/121462502