数据库
首页 > 数据库> > sparksql结果快速到mysql(scala代码、airflow调度)

sparksql结果快速到mysql(scala代码、airflow调度)

作者:互联网

 

经常会有这样的需求:在现有数仓表的基础上,写一些sql,然后生成hive表并同步到mysql。

次数多了,就像写一个工具完成这个工作

一:背景、功能、流程介绍

1.背景:
    1.数仓使用hive存储,datax导数据、airflow调度
    2.不知道怎么利用hive解析sql,拿到对应的schema,但是spark知道 
      spark.sql(sql).schema.toList所以就用了scala
2.功能
    就是通过配置完成hive,mysql的建表,airflow调度任务的生成
3.流程
    1.配置mysql链接
    2.根据输入sparksql,生成对应的hive,mysql表结构,建表
    3.生成airflow调度任务(插入hive数据,调用datax同步数据到mysql)

二:代码

1.配置文件介绍:

MysqlToHive.properties

        jdbcalias:ptx_read    #mysql别名要和同步的数据库的别名保持一致
        table:be_product      #要同步的表名 
        owner=owner              ##airflow任务的owner
        lifecycle=180                  ##hive表的生命周期,数据数据产品删除数据

        airflowpath=/airflow/dags/ods/    ##生成airflow任务文件的路径

        jdbc1alias : hive                 ##可以写多个mysql链接,不用一个来回改
        jdbc1host : 127.0.0.1
        jdbc1port : 3306
        jdbc1user : root
        jdbc1passwd : **
        jdbc1db_name : test

        jdbc2alias:read
        jdbc2host : 127.0.0.1
        jdbc2port : 3306
        jdbc2user : root
        jdbc2passwd :**
        jdbc2db_name :test
2.基本代码:

MysqlToHive.java

      object HiveToMysql {
        //mysql配置内部类
        case class Database(host: String,port: Int,user: String,passwd: String,db_name: String){}

        //读取配置文件
        def readDbPropertiesFile(fileName: String,spark:SparkSession,sql: String): Unit = {
          val pp = new Properties
          val fps = new FileInputStream("HiveToMysql.properties")
      //    val fps = Thread.currentThread.getContextClassLoader.getResourceAsStream(fileName)
          pp.load(fps)
          parseProperties(pp,spark,sql)
          fps.close()
        }
        //解析配置文件对应配置
        def parseProperties(pp: Properties,spark:SparkSession,sql: String): Unit = {
          val table = pp.getProperty("table")
          val owner = pp.getProperty("owner")
          val lifecycle = pp.getProperty("lifecycle")
          val jdbcalias = pp.getProperty("jdbcalias")
          val airflowpath = pp.getProperty("airflowpath")
          import scala.collection.mutable.ArrayBuffer
          var tableColumn: ArrayBuffer[String] = new ArrayBuffer[String]();

          var dbindex = 1
          while (pp.getProperty("jdbc" + dbindex + "alias") != null && !pp.getProperty("jdbc" + dbindex + "alias").equals(jdbcalias)) {
            dbindex += 1
          }

          var database = new Database(pp.getProperty("jdbc" + dbindex + "host"),pp.getProperty("jdbc" + dbindex + "port").toInt,
            pp.getProperty("jdbc" + dbindex + "user"),pp.getProperty("jdbc" + dbindex + "passwd"),pp.getProperty("jdbc" + dbindex + "db_name"))

          val mysqlSelectBuilder = new StringBuilder

          val schemaList = spark.sql(sql).schema.toList
          //sparksql  利用schema生成hive建表语句和mysql建表语句
          for ( i <- 0 until schemaList.length ) {
            println(schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
            tableColumn += (schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
            mysqlSelectBuilder.append(schemaList.apply(i).name+",")
          }
          mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length - 1)

          buildExecuteHiveSql(table,tableColumn,lifecycle,owner)
          buildExecuteMysql(table,tableColumn,database);
          printAirflowJob(airflowpath,table,owner,jdbcalias,mysqlSelectBuilder.toString(),sql: String)
        }

        //airflow封装太多了,就不写了
        def printAirflowJob(airflowpath:String,table:String,owner:String,jdbcalias:String,mysqlSelect:String,sql:String){

          val db = table.substring(0, table.indexOf("."));
          val tableNoDatabase = table.substring(table.indexOf(".") + 1);
          System.out.println(airflowpath +db+"/"+ tableNoDatabase)
          if (new File(airflowpath +db+"/"+ tableNoDatabase).exists())
            System.out.println("folder exist,please delete the folder " + airflowpath +db+"/"+ tableNoDatabase)
          else {
            val dir = new File(airflowpath +db+"/"+ tableNoDatabase);
            dir.mkdirs();
            val pw = new PrintWriter(airflowpath +db+"/"+ tableNoDatabase + "/" + tableNoDatabase + "_dag.py")

            pw.println("import airflow");
            pw.println("from airflow import DAG");
            pw.println(")");
            pw.println("");
            pw.println("");
            pw.flush()
            pw.close()
          }
        }

        @throws[IOException]
        def buildExecuteMysql(table:String,tableColumn:ArrayBuffer[String],database:Database): Unit = {

          val mysqlSqlBuilder = new StringBuilder
          mysqlSqlBuilder.append("CREATE TABLE " + table.substring(table.indexOf(".")+1)+ " ( \n")
          mysqlSqlBuilder.append("dt varchar(10) DEFAULT NULL,"+"\n")
          println("tableColumnCopy"+tableColumn.size)
          val tableColumnCopy = tableColumn.toArray[String];

          for (i <- 0 until tableColumnCopy.size) {
            val fieldAndType = tableColumnCopy.apply(i).split("\\|")

            mysqlSqlBuilder.append(fieldAndType(0)+ " ")
            if (fieldAndType(1).contains("integer") || fieldAndType(1).contains("long"))
              mysqlSqlBuilder.append(" bigint(10)")
            else if (fieldAndType(1).contains("float") || fieldAndType(1).contains("double") || fieldAndType(1).contains("decimal"))
              mysqlSqlBuilder.append(" decimal(36,6)")
            else if (fieldAndType(1).contains("string") )
              mysqlSqlBuilder.append(" varchar(36)")
            else if (fieldAndType(1).contains("boolean") )
              mysqlSqlBuilder.append(" boolean")
            else if (fieldAndType(1).contains("date") || fieldAndType(1).contains("timestamp") )
              mysqlSqlBuilder.append(" varchar(36)")


            mysqlSqlBuilder.append(" DEFAULT NULL," +"\n")
          }
          mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,

          mysqlSqlBuilder.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
          System.out.println(mysqlSqlBuilder.toString)

          Class.forName("com.mysql.cj.jdbc.Driver")
          var con = DriverManager.getConnection("jdbc:mysql://" + database.host + ":" + database.port + "/" + database.db_name + "?serverTimezone=UTC", database.user, database.passwd)
          var st = con.createStatement
          st.execute(mysqlSqlBuilder.toString)
          st.close();con.close();

        }

        @throws[IOException]
        @throws[InterruptedException]
        def buildExecuteHiveSql(table:String,tableColumn:ArrayBuffer[String],lifecycle:String,owner:String): Unit = {
          val mysqlSqlBuilder = new StringBuilder
          mysqlSqlBuilder.append("CREATE TABLE " + table+ " ( \n")
          println("tableColumnCopy"+tableColumn.size)
          val tableColumnCopy = tableColumn.toArray[String];

          for (i <- 0 until tableColumnCopy.size) {
            val fieldAndType = tableColumnCopy.apply(i).split("\\|")

            if(fieldAndType.apply(1).contains("integer") || fieldAndType.apply(1).contains("long"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" bigint,")
            else if(fieldAndType.apply(1).contains("float") || fieldAndType.apply(1).contains("double") || fieldAndType.apply(1).contains("decimal"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" "+"double ,")
            else if(fieldAndType.apply(1).contains("string"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
            else if(fieldAndType.apply(1).contains("boolean"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" boolean,")
            else if(fieldAndType.apply(1).contains("date")||fieldAndType.apply(1).contains("timestamp"))
              mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
            mysqlSqlBuilder.append("\n")
          }
          mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,

          mysqlSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n")
          mysqlSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n")
          mysqlSqlBuilder.append("TBLPROPERTIES ('lifecycle'='" + lifecycle + "','owner'='" + owner + "','parquet.compression'='snappy');")
          System.out.println(mysqlSqlBuilder.toString)
          val process = new ProcessBuilder("hive", "-e", "\"" + mysqlSqlBuilder.toString + "\"").redirectErrorStream(true).start

          val br = new BufferedReader(new InputStreamReader(process.getInputStream))
          var line = ""
          do {
            line = br.readLine()
            Thread.sleep(1000)
            println(line)

          }while(line!=null)
          process.waitFor
        }
        def main(args: Array[String]): Unit = {
      //        val sparkconf = new SparkConf().setAppName("test_Spark_sql").setMaster("local[2]")
      //        val spark = SparkSession.builder().config(sparkconf).config("spark.driver.host", "localhost").getOrCreate()

          val spark= SparkSession.builder.appName("HiveToMysql").enableHiveSupport().getOrCreate()
          readDbPropertiesFile("HiveToMysql.properties",spark,args(0))

        }
      }
3.脚本文件:HiveToMysql.sh
                #!/bin/bash

        mv bigData.jar .
        mv HiveToMysql.properties .

        sql=`cat  /sql`

        spark-submit \
                --class HiveToMysql \
                --master yarn \
                --deploy-mode client \
                --num-executors 1 \
                --executor-memory 4g \
                --executor-cores 1 \
                --driver-memory 1g \
                --name "HiveToMysql" \
                --conf spark.speculation=true \
                --conf spark.speculation.interval=30000 \
                --conf spark.speculation.quantile=0.8 \
                --conf spark.speculation.multiplier=1.5 \
                --conf spark.dynamicAllocation.enabled=false \
                --files HiveToMysql.properties \
                --jars fastjson-1.2.62.jar,mysql-connector-java-8.0.18.jar \
                bigData.jar "$sql"
4.可能的问题
        1.scala比较烂,代码比较难阅读
        2.调度的时间一样(可做需改)
        3.数据类型的处理,根据业务需求

标签:airflow,String,val,scala,mysqlSqlBuilder,sparksql,apply,fieldAndType,append
来源: https://www.cnblogs.com/wuxiaolong4/p/16462320.html