从mysql读数据创建DataFrame
作者:互联网
第一种方式spark.read.jdbc()
object _01_ReadJDBC {
def main(args: Array[String]): Unit = {
//1、创建spark session
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
properties.setProperty("query","id >= 2") //Both 'dbtable' and 'query' can not be specified at the same time.
//从jdbc中读取数据
val dataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/sql_01?characterEncoding=utf8",
"stu", properties)
//加上过滤条件
dataFrame.createTempView("jdbc_01")
val frame = spark.sql(
"""
|select id,name
|from
|jdbc_01
|where id >= 2;
|""".stripMargin)
frame.show()
spark.stop()
}
}
第二种方式spark.read.format(“jdbc”).options(pro).load()
object _02_ReadJDBC {
def main(args: Array[String]): Unit = {
//获取session对象
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
//从jdbc中读取数据并且进行过滤
val pro = new mutable.HashMap[String,String]()
pro.put("url","jdbc:mysql://localhost:3306/sql_01?characterEncoding=utf8")
pro.put("user","root")
pro.put("password","123456")
pro.put("query","select * from users where userid >= 8 ")
//pro.put("dbtable","users")
//DataFrame一定会有Schema信息,这是读取mysql的schema信息获取的
val dataFrame: DataFrame = spark.read.format("jdbc").options(pro).load()
dataFrame.show()
spark.stop()
}
}
标签:jdbc,val,pro,DataFrame,01,读数据,mysql,put,spark 来源: https://blog.csdn.net/qq_44665283/article/details/118002183