数据库
首页 > 数据库> > 通过sparksql读取presto中的数据存到clickhouse

通过sparksql读取presto中的数据存到clickhouse

作者:互联网

整体结构

在这里插入图片描述

Config

package com.fuwei.bigdata.profile.conf

import org.slf4j.LoggerFactory
import scopt.OptionParser


case class Config(
                 env:String = "",
                 username:String = "",
                 password:String = "",
                 url:String = "",
                 cluster:String = "",
                 startDate:String = "",
                 endDate:String = "",
                 proxyUser:String = "",
                 topK:Int = 25
                 )

object Config{

    private val logger = LoggerFactory.getLogger("Config")

    /**
     * 将args参数数据封装Config对象中
     */

    def parseConfig(obj:Object,args:Array[String]):Config = {
        //1、通过我们的类名获取到程序名
        val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "")

        //2、获取到一个解析器,解析器解析参数
        val parser = new OptionParser[Config]("spark sql "+programName) {
            //2.1添加使用说明
            head(programName,"v1.0") //就相当于抬头
            //2.2给env属性赋值
            //这种效果就是-v或者--v ,后面的text()是说明的内容
            opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
            opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
            programName match {
                case "LabelGenerator" => {
                    logger.info("LabelGenerator")
                    opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
                    opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
                    opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
                    opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
                }
                case _ =>
            }
        }
        parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值
            case Some(conf) => conf
            case None => {
                logger.error("can not parse args")
                System.exit(-1)
                null
            }
        }
    }
}

LabelGenerator

package com.fuwei.bigdata.profile

import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

/**
 * 生成基础画像标签的类
 */
object LabelGenerator {

   private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)

        //1、解析参数
        val params: Config = Config.parseConfig(LabelGenerator, args)

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
        //val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
        import spark.implicits._

        //3、读取归属地数据
        val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
        df.createOrReplaceTempView("phone_info") //构建一个虚表

        //4、baseFeatrueSql
        val userSql =
            """
              |select
              |t1.distinct_id as uid,
              |t1.gender,
              |t1.age,
              |case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
              |case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
              |t2.model
              |from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
              |on t1.distinct_id = t2.uid
              |""".stripMargin


        val userDF: DataFrame = spark.sql(userSql)
        userDF.createOrReplaceTempView("user_info")

        //4.2baseFeatureSql
        val baseFeatureSql =
            """
              |select
              |t1.uid,
              |t1.gender,
              |t1.age,
              |t1.email_suffix,
              |t1.model,
              |concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
              |from user_info as t1 left join phone_info as t2
              |on
              |t2.phone = substring(t1.mobile,0,7)
              |""".stripMargin

        //4.3、建表
        spark.sql(
            """
              |create table if not exists dws_news.user_profile_base(
              |uid string,
              |gender string,
              |age string,
              |email_suffix string,
              |model string,
              |region string
              |)
              |stored as parquet
              |""".stripMargin)
        //4.4 查询生成df
        val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql)
        baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭

        //把查询的数据导入到数据表中(查询生成df数据到HDFS)
        baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")

        //5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符
        val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params)

        //6、插入ClickHouse数据
         //6.1插入的sql
        val insertCHSql =
            s"""
               |insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
               |""".stripMargin

        logger.warn(insertCHSql)

        //6.2按分区插入数据到clickhouse的表
        baseFeaturedDF.foreachPartition(partition => {
            TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params)
        })
        baseFeaturedDF.unpersist()//关闭持久化
        //7、释放资源
        spark.stop()
        logger.info("job has success")

    }
}

ClickHouseUtils

package com.fuwei.bigdata.profile.utils

import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties

object ClickHouseUtils {

    /**
     * 连接clickhouse
     * @param username
     * @param password
     * @param url
     * @return
     */
    def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
        val properties = new ClickHouseProperties()
        properties.setUser(username)
        properties.setPassword(password)
        val dataSource = new ClickHouseDataSource(url, properties)
        dataSource
    }

    /**
     *把类型转化并返回为age String, gender String
     */
    def df2TypeName2CH(dfCol: String): Unit ={
        dfCol.split(",").map(line => {
            val fields: Array[String] = line.split(" ")
            val fName: String = fields(0)
            val fType: String = dfType2chType(fields(1)) //开始类型的转换
            fName +" "+ fType //最后结果变成为age String, gender String
        }).mkString(",")
    }
    /**
     * 将df的type转换成clickhouse的type
     *
     * @param fieldType
     * @return
     */
    def dfType2chType(fieldType: String):String = {
        fieldType.toLowerCase() match {
            case "string" => "String"
            case "integer" => "Int32"
            case "long" => "Int64"
            case "bigint" => "Int64"
            case "double" => "Float64"
            case "float" => "Float32"
            case "timestamp" => "Datetime"
            case _ => "String"
        }
    }
}

SparkUtils(这个连接以后可以通用)

package com.fuwei.bigdata.profile.utils

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object SparkUtils {
    private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)

    def getSparkSession(env:String,appName:String):SparkSession = {
        val conf = new SparkConf()
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .set("spark.sql.hive.metastore.version", "1.2.1")
            .set("spark.sql.cbo.enabled", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
            .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")

        env match {
            case "prod" => {
                conf.setAppName(appName+"_prod")

                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case "dev" => {
                conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
                SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
            }
            case _ => {
                logger.error("not match env")
                System.exit(-1)
                null
            }
        }
    }

}

TableUtils

package com.fuwei.bigdata.profile.utils

import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{DataFrame, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}

import java.sql.PreparedStatement

/**
 * @author:lifuwei
 * @time:2022-01-07
 * @params:这个类主要是用于把在hive中读取的数据存储到clickhouse中
 */
object TableUtils {
    /**
     * 向clickhouse中插入数据
     * @param partition
     * @param insertCHSql
     * @param params
     */
    def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
        //1、获取到Clickhouse的数据源
        val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
        val connection: ClickHouseConnection = dataSource.getConnection
        val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据

        var batchCount = 0
        val batchSize = 2000
        var lastBatchTime = System.currentTimeMillis()
        //2、填充占位符对应的参数值
        partition.foreach(row => {
            var index = 1//设置值的索引下标
            row.schema.fields.foreach(field => {
                field.dataType match {
                    case StringType => ps.setString(index,row.getAs[String](field.name))
                    case LongType => ps.setLong(index,row.getAs[Long](field.name))
                    case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
                    case _ => logger.error(s"type is err,${field.dataType}")
                }
                index +=1
            })
            //3、添加到批
            ps.addBatch()
            batchCount += 1

            //4、控制批次大小
            var currentTime = System.currentTimeMillis()
            if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
                ps.executeBatch()//执行一批
                logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
                batchCount = 0
                lastBatchTime = currentTime
            }
        })

        //5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据
        ps.executeBatch()
        logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")

        //6、释放资源
        ps.close()
        connection.close()

    }

    private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)

    /**
     * 根据dataframe生成clickhouse中的表
     * @param baseFeaturedDF : dataframe
     * @param params : 数据值
     * @return 返回的dataframe各个的列的名称和占位符
     */

        /*
        * baseFeaturedDF的DF的schema
        * fieldName:uid,gender,age,region,model,email_suffix
        * fieldType:string,string,string,string,string,string
        *
        * 我们需要插入数据的形式是
        * insert user_profile_base into value(?,?,?,?,?,?)
        *
        * 所以我们需要在这里面获得三个东西,第一个就是参数,第二个即使参数类型,第三个就是插入的值
        * */

        val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" //创建的数据库
        val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表


    def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= {
            //schema就是获取表的所有元数据(包括以上三个)
            //foldLeft是折叠函数
            /*
            *  baseFeaturedDF.schema : 获取df的整体架构
            *  baseFeaturedDF.schema.fields :把整体架构封装带一个数组中
            *  baseFeaturedDF.schema.fields.foldLeft : 对这个数组进行折叠
            * ("","","") :这个表明是输入的初始值
            * */
            val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
                (z,f) => {
                    //我们要返回的数据类型是:(age,gender , age string, gender string, ?,?)
                    if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
                        //说明不是第一次拼接
                        (z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
                    }else{
                        (f.name,f.name+" "+ f.dataType.simpleString,"?")
                    }
                }
            )
            /*
            * 4、将spark的表达式转换为clickhouse的表达式
            * 在spark中的string,但是在clickhouse中是String
            * 最终得出来的结果是age String,gender String  ......
            * */
            val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)


            //5、获取到连接到ch的cluster
            val cluster:String = params.cluster

            //6、创建数据库
            val createCHDataBaseSql =
                s"""
                   |create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE}
                   |""".stripMargin

            //7、创建表
        /*
        * ENGINE = MergeTree():在clickhouse中需要使用引擎engine ,这里我们使用合并树引擎MergeTree()
        * */
        val createCHTableSql =
            s"""
               |create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
               |ENGINE = MergeTree()
               |ORDER BY(uid)
               |""".stripMargin

        //8、删除表的SQL
        val dropCHTableSql =
            s"""
               |drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}
               |""".stripMargin

        //9、连接clickhouse
        val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)

        val connection: ClickHouseConnection = dataSource.getConnection

        logger.warn(createCHDataBaseSql)
        var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)//建库
        ps.execute()

        logger.warn(dropCHTableSql)
        ps =  connection.prepareStatement(dropCHTableSql) //删表
        ps.execute()

        logger.warn(createCHTableSql)
        ps = connection.prepareStatement(createCHTableSql)//建表
        ps.execute()

        ps.close()
        connection.close()
        logger.info("success!!!!!!!!!")
        (fileName,pl)
    }
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fuwei.bigdata</groupId>
    <artifactId>user-profile</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.12</scala.version>
        <play-json.version>2.3.9</play-json.version>
        <maven-scala-plugin.version>2.10.1</maven-scala-plugin.version>
        <scala-maven-plugin.version>3.2.0</scala-maven-plugin.version>
        <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
        <spark.version>2.4.5</spark.version>
        <scope.type>compile</scope.type>
        <json.version>1.2.3</json.version>
        <!--compile provided-->
    </properties>

    <dependencies>

        <!--json 包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_2.11</artifactId>
            <version>4.0.0-RC2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark-bundle_2.11</artifactId>
            <version>0.5.2-incubating</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
            <scope>${scope.type}</scope>
            <exclusions>
                <exclusion>
                    <groupId>javax.mail</groupId>
                    <artifactId>mail</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.eclipse.jetty.aggregate</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>

    </dependencies>

    <repositories>

        <repository>
            <id>alimaven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <updatePolicy>never</updatePolicy>
            </releases>
            <snapshots>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>${maven-assembly-plugin.version}</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala-maven-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-archetype-plugin</artifactId>
                <version>2.2</version>
            </plugin>
        </plugins>
    </build>
</project>

测试

##1. 将core-site.xml\yarn-site.xml\hive-site.xml拷贝到工程resources目录下
##2. clean and package
##3. hive metastore服务必须开
##4. yarn/hdfs必须要开
##5. clickhouse/chproxy也要打开
##6. 编写提交jar包的spark脚本
${SPARK_HOME}/bin/spark-submit \
    --jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    --conf spark.executor.heartbeatInterval=120s \
    --conf spark.network.timeout=600s \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.yarn.submit.waitAppCompletion=false \
    --name log2hudi \
    --conf spark.task.cpus=1 \
    --conf spark.executor.cores=4 \
    --conf spark.sql.shuffle.partitions=50 \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 1G \
    --executor-memory 3G \
    --num-executors 1 \
    --class com.qf.bigdata.profile.LabelGenerator \
    /data/jar/user-profile.jar \
    -e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1

##7. 通过clickhouse-client去测试
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert

标签:case,String,val,presto,sparksql,org,spark,clickhouse
来源: https://blog.csdn.net/li1579026891/article/details/122381593