spark将jdbc查询的数据封装成DataFrame
作者:互联网
简述
spark在2.2.0版本是不支持通过jdbc的方式直接访问hive数据的,需要修改部分源码实现spark直接通过jdbc的方式读取hive数据,就在之前写的文章中的方法二里。
https://blog.csdn.net/qq_42213403/article/details/117557610?spm=1001.2014.3001.5501
还有一种方法不用重写源码,是先通过jdbc获取数据,再用spark封装成dataframe的方式操作的
实现过程
首先使用jdbc查询的方式获取hive表数据
def getResult()={
val properties = new Properties
properties.setProperty("url", "jdbc:hive2://192.168.5.61:10000/")
properties.setProperty("user", "hive")
properties.setProperty("password", "")
properties.setProperty("driver", "org.apache.hive.jdbc.HiveDriver")
val connection = getConnection(properties)
val statement = connection.createStatement
val resultSet = statement.executeQuery("select * from test.user_info")
resultSet
}
def getConnection(prop: Properties): Connection = try {
Class.forName(prop.getProperty("driver"))
conn = DriverManager.getConnection(prop.getProperty("url"), prop.getProperty("user"), prop.getProperty("password"))
conn
} catch {
case e: Exception =>
e.printStackTrace()
null
}
把查出的ResultSet转换成DataFrame
def createStructField(name:String,colType:String):StructField={
colType match {
case "java.lang.String" =>{StructField(name,StringType,true)}
case "java.lang.Integer" =>{StructField(name,IntegerType,true)}
case "java.lang.Long" =>{StructField(name,LongType,true)}
case "java.lang.Boolean" =>{StructField(name,BooleanType,true)}
case "java.lang.Double" =>{StructField(name,DoubleType,true)}
case "java.lang.Float" =>{StructField(name,FloatType,true)}
case "java.sql.Date" =>{StructField(name,DateType,true)}
case "java.sql.Time" =>{StructField(name,TimestampType,true)}
case "java.sql.Timestamp" =>{StructField(name,TimestampType,true)}
case "java.math.BigDecimal" =>{StructField(name,DecimalType(10,0),true)}
}
}
/**
* 把查出的ResultSet转换成DataFrame
*/
def createResultSetToDF(rs:ResultSet,sparkSession: SparkSession):DataFrame= {
val rsmd = rs.getMetaData
val columnTypeList = new util.ArrayList[String]
val rowSchemaList = new util.ArrayList[StructField]
for (i <- 1 to rsmd.getColumnCount) {
var temp = rsmd.getColumnClassName(i)
temp = temp.substring(temp.lastIndexOf(".") + 1)
if ("Integer".equals(temp)) {
temp = "Int";
}
columnTypeList.add(temp)
rowSchemaList.add(createStructField(rsmd.getColumnName(i), rsmd.getColumnClassName(i)))
}
val rowSchema = StructType(rowSchemaList)
//ResultSet反射类对象
val rsClass = rs.getClass
var count = 1
val resultList = new util.ArrayList[Row]
var totalDF = sparkSession.createDataFrame(new util.ArrayList[Row], rowSchema)
while (rs.next()) {
count = count + 1
// val temp = new util.ArrayList[Object]
val buffer = new ArrayBuffer[Any]()
for (i <- 0 to columnTypeList.size() - 1) {
val method = rsClass.getMethod("get" + columnTypeList.get(i), "aa".getClass)
buffer+=method.invoke(rs, rsmd.getColumnName(i + 1))
}
resultList.add(Row(buffer: _*))
if (count % 100000 == 0) {
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF).distinct()
resultList.clear()
}
}
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF)
totalDF
}
运行代码
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
val df = createResultSetToDF(getResult(),spark)
df.show()
结果
完整代码
import java.sql.{Connection, DriverManager, ResultSet}
import java.util
import java.util.Properties
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{BooleanType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import scala.collection.mutable.ArrayBuffer
/**
* @Author: fcy
* @Date: 2021/6/4 4:16 下午
*/
object SparkJDBCHiveDataFrame {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
val df = createResultSetToDF(getResult(),spark)
df.show()
}
def getResult()={
val properties = new Properties
properties.setProperty("url", "jdbc:hive2://xxx:10000/")
properties.setProperty("user", "hive")
properties.setProperty("password", xxx)
properties.setProperty("driver", "org.apache.hive.jdbc.HiveDriver")
val connection = getConnection(properties)
val statement = connection.createStatement
val resultSet = statement.executeQuery("select * from test.user_info")
resultSet
}
def getConnection(prop: Properties): Connection = try {
Class.forName(prop.getProperty("driver"))
conn = DriverManager.getConnection(prop.getProperty("url"), prop.getProperty("user"), prop.getProperty("password"))
conn
} catch {
case e: Exception =>
e.printStackTrace()
null
}
def createStructField(name:String,colType:String):StructField={
colType match {
case "java.lang.String" =>{StructField(name,StringType,true)}
case "java.lang.Integer" =>{StructField(name,IntegerType,true)}
case "java.lang.Long" =>{StructField(name,LongType,true)}
case "java.lang.Boolean" =>{StructField(name,BooleanType,true)}
case "java.lang.Double" =>{StructField(name,DoubleType,true)}
case "java.lang.Float" =>{StructField(name,FloatType,true)}
case "java.sql.Date" =>{StructField(name,DateType,true)}
case "java.sql.Time" =>{StructField(name,TimestampType,true)}
case "java.sql.Timestamp" =>{StructField(name,TimestampType,true)}
case "java.math.BigDecimal" =>{StructField(name,DecimalType(10,0),true)}
}
}
/**
* 把查出的ResultSet转换成DataFrame
*/
def createResultSetToDF(rs:ResultSet,sparkSession: SparkSession):DataFrame= {
val rsmd = rs.getMetaData
val columnTypeList = new util.ArrayList[String]
val rowSchemaList = new util.ArrayList[StructField]
for (i <- 1 to rsmd.getColumnCount) {
var temp = rsmd.getColumnClassName(i)
temp = temp.substring(temp.lastIndexOf(".") + 1)
if ("Integer".equals(temp)) {
temp = "Int";
}
columnTypeList.add(temp)
rowSchemaList.add(createStructField(rsmd.getColumnName(i), rsmd.getColumnClassName(i)))
}
val rowSchema = StructType(rowSchemaList)
//ResultSet反射类对象
val rsClass = rs.getClass
var count = 1
val resultList = new util.ArrayList[Row]
var totalDF = sparkSession.createDataFrame(new util.ArrayList[Row], rowSchema)
while (rs.next()) {
count = count + 1
// val temp = new util.ArrayList[Object]
val buffer = new ArrayBuffer[Any]()
for (i <- 0 to columnTypeList.size() - 1) {
val method = rsClass.getMethod("get" + columnTypeList.get(i), "aa".getClass)
buffer+=method.invoke(rs, rsmd.getColumnName(i + 1))
}
resultList.add(Row(buffer: _*))
if (count % 100000 == 0) {
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF).distinct()
resultList.clear()
}
}
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF)
totalDF
}
}
欢迎留言讨论和指正
标签:case,jdbc,java,name,val,StructField,DataFrame,spark,true 来源: https://blog.csdn.net/qq_42213403/article/details/117564393