通过sparksql读取presto中的数据存到clickhouse
作者:互联网
整体结构
Config
package com.fuwei.bigdata.profile.conf
import org.slf4j.LoggerFactory
import scopt.OptionParser
case class Config(
env:String = "",
username:String = "",
password:String = "",
url:String = "",
cluster:String = "",
startDate:String = "",
endDate:String = "",
proxyUser:String = "",
topK:Int = 25
)
object Config{
private val logger = LoggerFactory.getLogger("Config")
/**
* 将args参数数据封装Config对象中
*/
def parseConfig(obj:Object,args:Array[String]):Config = {
//1、通过我们的类名获取到程序名
val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")
//2、获取到一个解析器,解析器解析参数
val parser = new OptionParser[Config]("spark sql "+programName) {
//2.1添加使用说明
head(programName,"v1.0") //就相当于抬头
//2.2给env属性赋值
//这种效果就是-v或者--v ,后面的text()是说明的内容
opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "LabelGenerator" => {
logger.info("LabelGenerator")
opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
}
case _ =>
}
}
parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值
case Some(conf) => conf
case None => {
logger.error("can not parse args")
System.exit(-1)
null
}
}
}
}
LabelGenerator
package com.fuwei.bigdata.profile
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
/**
* 生成基础画像标签的类
*/
object LabelGenerator {
private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、解析参数
val params: Config = Config.parseConfig(LabelGenerator, args)
//2、获取SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
//val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
import spark.implicits._
//3、读取归属地数据
val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
df.createOrReplaceTempView("phone_info") //构建一个虚表
//4、baseFeatrueSql
val userSql =
"""
|select
|t1.distinct_id as uid,
|t1.gender,
|t1.age,
|case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
|case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
|t2.model
|from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
|on t1.distinct_id = t2.uid
|""".stripMargin
val userDF: DataFrame = spark.sql(userSql)
userDF.createOrReplaceTempView("user_info")
//4.2baseFeatureSql
val baseFeatureSql =
"""
|select
|t1.uid,
|t1.gender,
|t1.age,
|t1.email_suffix,
|t1.model,
|concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
|from user_info as t1 left join phone_info as t2
|on
|t2.phone = substring(t1.mobile,0,7)
|""".stripMargin
//4.3、建表
spark.sql(
"""
|create table if not exists dws_news.user_profile_base(
|uid string,
|gender string,
|age string,
|email_suffix string,
|model string,
|region string
|)
|stored as parquet
|""".stripMargin)
//4.4 查询生成df
val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭
//把查询的数据导入到数据表中(查询生成df数据到HDFS)
baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")
//5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符
val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)
//6、插入ClickHouse数据
//6.1插入的sql
val insertCHSql =
s"""
|insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
|""".stripMargin
logger.warn(insertCHSql)
//6.2按分区插入数据到clickhouse的表
baseFeaturedDF.foreachPartition(partition => {
TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
})
baseFeaturedDF.unpersist()//关闭持久化
//7、释放资源
spark.stop()
logger.info("job has success")
}
}
ClickHouseUtils
package com.fuwei.bigdata.profile.utils
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickHouseUtils {
/**
* 连接clickhouse
* @param username
* @param password
* @param url
* @return
*/
def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val properties = new ClickHouseProperties()
properties.setUser(username)
properties.setPassword(password)
val dataSource = new ClickHouseDataSource(url, properties)
dataSource
}
/**
*把类型转化并返回为age String, gender String
*/
def df2TypeName2CH(dfCol: String): Unit ={
dfCol.split(",").map(line => {
val fields: Array[String] = line.split(" ")
val fName: String = fields(0)
val fType: String = dfType2chType(fields(1)) //开始类型的转换
fName +" "+ fType //最后结果变成为age String, gender String
}).mkString(",")
}
/**
* 将df的type转换成clickhouse的type
*
* @param fieldType
* @return
*/
def dfType2chType(fieldType: String):String = {
fieldType.toLowerCase() match {
case "string" => "String"
case "integer" => "Int32"
case "long" => "Int64"
case "bigint" => "Int64"
case "double" => "Float64"
case "float" => "Float32"
case "timestamp" => "Datetime"
case _ => "String"
}
}
}
SparkUtils(这个连接以后可以通用)
package com.fuwei.bigdata.profile.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
def getSparkSession(env:String,appName:String):SparkSession = {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version", "1.2.1")
.set("spark.sql.cbo.enabled", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
env match {
case "prod" => {
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" => {
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ => {
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
TableUtils
package com.fuwei.bigdata.profile.utils
import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}
import java.sql.PreparedStatement
/**
* @author:lifuwei
* @time:2022-01-07
* @params:这个类主要是用于把在hive中读取的数据存储到clickhouse中
*/
object TableUtils {
/**
* 向clickhouse中插入数据
* @param partition
* @param insertCHSql
* @param params
*/
def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
//1、获取到Clickhouse的数据源
val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
val connection: ClickHouseConnection = dataSource.getConnection
val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据
var batchCount = 0
val batchSize = 2000
var lastBatchTime = System.currentTimeMillis()
//2、填充占位符对应的参数值
partition.foreach(row => {
var index = 1//设置值的索引下标
row.schema.fields.foreach(field => {
field.dataType match {
case StringType => ps.setString(index,row.getAs[String](field.name))
case LongType => ps.setLong(index,row.getAs[Long](field.name))
case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
case _ => logger.error(s"type is err,${field.dataType}")
}
index +=1
})
//3、添加到批
ps.addBatch()
batchCount += 1
//4、控制批次大小
var currentTime = System.currentTimeMillis()
if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
ps.executeBatch()//执行一批
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
batchCount = 0
lastBatchTime = currentTime
}
})
//5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据
ps.executeBatch()
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")
//6、释放资源
ps.close()
connection.close()
}
private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)
/**
* 根据dataframe生成clickhouse中的表
* @param baseFeaturedDF : dataframe
* @param params : 数据值
* @return 返回的dataframe各个的列的名称和占位符
*/
/*
* baseFeaturedDF的DF的schema
* fieldName:uid,gender,age,region,model,email_suffix
* fieldType:string,string,string,string,string,string
*
* 我们需要插入数据的形式是
* insert user_profile_base into value(?,?,?,?,?,?)
*
* 所以我们需要在这里面获得三个东西,第一个就是参数,第二个即使参数类型,第三个就是插入的值
* */
val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" //创建的数据库
val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表
def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
//schema就是获取表的所有元数据(包括以上三个)
//foldLeft是折叠函数
/*
* baseFeaturedDF.schema : 获取df的整体架构
* baseFeaturedDF.schema.fields :把整体架构封装带一个数组中
* baseFeaturedDF.schema.fields.foldLeft : 对这个数组进行折叠
* ("","","") :这个表明是输入的初始值
* */
val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
(z,f) => {
//我们要返回的数据类型是:(age,gender , age string, gender string, ?,?)
if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
//说明不是第一次拼接
(z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
}else{
(f.name,f.name+" "+ f.dataType.simpleString,"?")
}
}
)
/*
* 4、将spark的表达式转换为clickhouse的表达式
* 在spark中的string,但是在clickhouse中是String
* 最终得出来的结果是age String,gender String ......
* */
val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)
//5、获取到连接到ch的cluster
val cluster:String = params.cluster
//6、创建数据库
val createCHDataBaseSql =
s"""
|create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
|""".stripMargin
//7、创建表
/*
* ENGINE = MergeTree():在clickhouse中需要使用引擎engine ,这里我们使用合并树引擎MergeTree()
* */
val createCHTableSql =
s"""
|create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
|ENGINE = MergeTree()
|ORDER BY(uid)
|""".stripMargin
//8、删除表的SQL
val dropCHTableSql =
s"""
|drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
|""".stripMargin
//9、连接clickhouse
val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)
val connection: ClickHouseConnection = dataSource.getConnection
logger.warn(createCHDataBaseSql)
var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)//建库
ps.execute()
logger.warn(dropCHTableSql)
ps = connection.prepareStatement(dropCHTableSql) //删表
ps.execute()
logger.warn(createCHTableSql)
ps = connection.prepareStatement(createCHTableSql)//建表
ps.execute()
ps.close()
connection.close()
logger.info("success!!!!!!!!!")
(fileName,pl)
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fuwei.bigdata</groupId>
<artifactId>user-profile</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
<play-json.version>2.3.9</play-json.version>
<maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
<scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
<spark.version>2.4.5</spark.version>
<scope.type>compile</scope.type>
<json.version>1.2.3</json.version>
<!--compile provided-->
</properties>
<dependencies>
<!--json 包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.11</artifactId>
<version>4.0.0-RC2</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_2.11</artifactId>
<version>0.5.2-incubating</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<scope>${scope.type}</scope>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>alimaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.2</version>
</plugin>
</plugins>
</build>
</project>
测试
##1. 将core-site.xml\yarn-site.xml\hive-site.xml拷贝到工程resources目录下
##2. clean and package
##3. hive metastore服务必须开
##4. yarn/hdfs必须要开
##5. clickhouse/chproxy也要打开
##6. 编写提交jar包的spark脚本
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name log2hudi \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.qf.bigdata.profile.LabelGenerator \
/data/jar/user-profile.jar \
-e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1
##7. 通过clickhouse-client去测试
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert
标签:case,String,val,presto,sparksql,org,spark,clickhouse 来源: https://blog.csdn.net/li1579026891/article/details/122381593