数据库
首页 > 数据库> > 一键同步mysql到数仓(airflow调度)

一键同步mysql到数仓(airflow调度)

作者:互联网

经常会接到产品的需求:同步***表到hive,做分析。(做多了感觉很烦,就写一个工具)

一:背景、功能、流程介绍

1.背景:
    1.数仓使用hive存储,datax导数据、airflow调度
    2.虽然数据产品同学对datax进行了封装,可以点点点完成mysql表的同步,但是过程太复杂了
        还需要自己手动建表,还不支持修改。就萌生了自己写一个工具的想法
2.功能
    就是通过mysql配置完成hive的一般建表,airflow调度任务的生成
3.流程
    1.配置mysql链接
    2.根据mysql数据类型,生成对应的hive表结构,建表
    3.生成airflow调度任务(读取mysql表,调用datax,修复分区)

二:代码

1.配置文件介绍:

MysqlToHive.properties

        jdbcalias:ptx_read    #mysql别名要和同步的数据库的别名保持一致
        table:be_product      #要同步的表名 
        hive_prefix:ods.ods_product_   ##生成hive表的前缀
        hive_suffix:_dd                ##增量表还是全量表
        owner=xiaolong.wu              ##airflow任务的owner
        lifecycle=180                  ##hive表的生命周期,数据数据产品删除数据

        airflowpath=/airflow/dags/ods/    ##生成airflow任务文件的路径
        s3path=s3://path                  ##datax写hive表需要的基本路径


        jdbc1alias : hive                 ##可以写多个mysql链接,不用一个来回改
        jdbc1host : 127.0.0.1
        jdbc1port : 3306
        jdbc1user : root
        jdbc1passwd : **
        jdbc1db_name : test

        jdbc2alias:read
        jdbc2host : 127.0.0.1
        jdbc2port : 3306
        jdbc2user : root
        jdbc2passwd :**
        jdbc2db_name :test
2.基本代码:

MysqlToHive.java

        import java.io.*;
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.ResultSet;
        import java.sql.Statement;
        import java.util.ArrayList;
        import java.util.List;
        import java.util.Properties;

        class Database {//mysql配置工具类,非重点
            private String alias;
            private String host;
            private int port;
            private String user;
            private String passwd;
            private String db_name;

            public String getAlias() {return alias;}
            public void setAlias(String alias) {this.alias = alias;}
            public String getHost() {return host;}
            public void setHost(String host) {this.host = host;}
            public int getPort() {return port;}
            public void setPort(int port) {this.port = port;}
            public String getUser() {return user;}
            public void setUser(String user) {this.user = user;}
            public String getPasswd() {return passwd;}
            public void setPasswd(String passwd) {this.passwd = passwd;}
            public String getDb_name() {return db_name;}
            public void setDb_name(String db_name) {this.db_name = db_name;}

            @Override
            public String toString() {
                return "Database{" +"alias='" + alias + '\'' +", host='" + host + '\'' +", port=" + port +
                        ", user='" + user + '\'' +", passwd='" + passwd + '\'' +", db_name='" + db_name + '\'' +'}';
            }
        }

        public class MysqlToHive {
            public static String jdbcalias;
            public static String table;
            public static String hive_prefix;
            public static String hive_suffix;
            public static String owner;
            public static String lifecycle;
            public static String airflowpath;
            public static String s3path;


            public static Database database = new Database();
            public static List<Database> databasesList = new ArrayList<Database>();
            public static List<String> mysqlTableColumn = new ArrayList<String>();

            public static void parseProperties(Properties pp){
                jdbcalias = pp.getProperty("jdbcalias");
                table = pp.getProperty("table");
                hive_prefix = pp.getProperty("hive_prefix");
                owner = pp.getProperty("owner");
                lifecycle = pp.getProperty("lifecycle");
                airflowpath = pp.getProperty("airflowpath");
                s3path = pp.getProperty("s3path");
                hive_suffix = pp.getProperty("hive_suffix");

                int dbindex = 1;//根据mysql链接的别名,找到对应的mysql配置
                while(pp.getProperty("jdbc"+dbindex+"alias")!= null){
                    Database databaseItem = new Database();
                    databaseItem.setDb_name(pp.getProperty("jdbc"+dbindex+"db_name"));
                    databaseItem.setHost(pp.getProperty("jdbc"+dbindex+"host"));
                    databaseItem.setAlias(pp.getProperty("jdbc"+dbindex+"alias"));
                    databaseItem.setPasswd(pp.getProperty("jdbc"+dbindex+"passwd"));
                    databaseItem.setPort(Integer.parseInt(pp.getProperty("jdbc"+dbindex+"port")));
                    databaseItem.setUser(pp.getProperty("jdbc"+dbindex+"user"));
                    System.out.println(databaseItem.toString());
                    databasesList.add(databaseItem);

                    if(databaseItem.getAlias().equals(jdbcalias)){
                        database =databasesList.get(dbindex-1);
                    }
                    dbindex++;
                }
            }
            //读取配置文件
            public static  void readDbPropertiesFile (String fileName){
                Properties pp = new Properties();
                try {
                    InputStream fps = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName);
                    pp.load(fps);
                    parseProperties(pp);
                    fps.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //链接mysql,拿到对应的表结构,为后续生成hive表做准备
            public static  void readTableFormatted () throws Exception {
                String sql="show full fields from " + table;

                Connection con=null;
                Statement st=null;
                ResultSet rs=null;
                Class.forName("com.mysql.cj.jdbc.Driver");
                System.out.println("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC");
                con= DriverManager.getConnection("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC", database.getUser(),database.getPasswd());
                st=con.createStatement();
                rs=st.executeQuery(sql);
                while(rs.next())
                {
        //            System.out.println(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment"));
                    mysqlTableColumn.add(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment"));
                }
            }
            public static  int getmysqlLength(String type) {
                return Integer.parseInt(type.substring(type.indexOf("(")+1,type.indexOf(")")));
            }
            //根据mysql类型,生成对应的hive类型,并建hive表
            public static  void buildExecuteHiveSql () throws IOException, InterruptedException {
                StringBuilder hiveSqlBuilder = new StringBuilder();

                hiveSqlBuilder.append("CREATE TABLE "+hive_prefix+table+hive_suffix+" ( \n");
                for (int i = 0; i < mysqlTableColumn.size(); i++) {
                    String []fieldAndType= mysqlTableColumn.get(i).split("\\|");

                    hiveSqlBuilder.append(fieldAndType[0] + " ");
                    if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){
                        hiveSqlBuilder.append("bigint");
                    }
                    else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){
                        hiveSqlBuilder.append("binary");
                    }
                    else if(fieldAndType[1].contains("date") ){
                        hiveSqlBuilder.append("date");
                    }
                    else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){
                        hiveSqlBuilder.append("double");
                    }
                    else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext")
                            || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){
                        hiveSqlBuilder.append("string");
                    }
                    String comment = "";
                    if(fieldAndType.length==3){comment = fieldAndType[2];};
                    hiveSqlBuilder.append(" comment '"+comment+ "' ,");
                    hiveSqlBuilder.append("\n");
                }
                hiveSqlBuilder.deleteCharAt(hiveSqlBuilder.length()-2); //去除最后的回车和,
                hiveSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n");
                hiveSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n");
                hiveSqlBuilder.append("TBLPROPERTIES ('lifecycle'='"+lifecycle+"','owner'='"+owner+"','parquet.compression'='snappy');");
                System.out.println(hiveSqlBuilder.toString());

                Process process = new ProcessBuilder("hive","-e","\""+hiveSqlBuilder.toString()+"\"").redirectErrorStream(true).start();;
                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
                String line;
                while ((line = br.readLine()) != null) {
                    System.out.println(line);
                }
                process.waitFor();
            }
            //拼接mysql查询语句,用于airflow调度中查询mysql的sql语句
            public static  void printAirflowJobGetSelectSql (StringBuilder mysqlSelectBuilder,StringBuilder hiveTypeBuilder){
                mysqlSelectBuilder.append("select ");
                for (int i = 0; i < mysqlTableColumn.size(); i++) {
                    String []fieldAndType= mysqlTableColumn.get(i).split("\\|");
                    mysqlSelectBuilder.append(fieldAndType[0] + ",");
                    hiveTypeBuilder.append("{\"name\":\""+fieldAndType[0]+"\",\"type\":\"");
                    if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){
                        hiveTypeBuilder.append("bigint");
                    }
                    else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){
                        hiveTypeBuilder.append("binary");
                    }
                    else if(fieldAndType[1].contains("date") ){
                        hiveTypeBuilder.append("date");
                    }
                    else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){
                        hiveTypeBuilder.append("double");
                    }
                    else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext")
                            || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){
                        hiveTypeBuilder.append("string");
                    }
                    hiveTypeBuilder.append("\"},");
                }
                hiveTypeBuilder.deleteCharAt(hiveTypeBuilder.length()-1);
                mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length()-1);
                mysqlSelectBuilder.append(" from " + table);
            }
            //在固定路径下生成airflow文件,就是生成调度
            //该部分涉及到公司封装的代码太多了,就不保留了
            public static  void printAirflowJob () throws FileNotFoundException {

                String db = hive_prefix.substring(0,hive_prefix.indexOf("."));
                String odsTableName = hive_prefix.substring(hive_prefix.indexOf(".")+1);
                if(new File(odsTableName).exists()){
                    System.out.println("folder exist,please delete the folder "+airflowpath+odsTableName+table+hive_suffix);
                }
                else{
                    StringBuilder mysqlSelectBuilder = new StringBuilder();
                    StringBuilder hiveTypeBuilder = new StringBuilder();
                    printAirflowJobGetSelectSql(mysqlSelectBuilder,hiveTypeBuilder);

                    File dir = new File(airflowpath+odsTableName+table+hive_suffix);
                    dir.mkdirs();
                    PrintWriter pw = new PrintWriter(airflowpath+odsTableName+table+hive_suffix+"/"+odsTableName+table+hive_suffix+"_dag.py");
                    pw.println("import airflow");

                    pw.println("job_name='"+hive_prefix+table+hive_suffix+"'");
                    pw.println("job_owner='"+owner+"'");
                    pw.println("default_job_retries=1");
                    pw.println("default_job_retry_delay=timedelta(minutes=5)");
                    pw.println("default_start_date=airflow.utils.dates.days_ago(1)");
                    pw.println("dag_schedule_interval='12 1 * * *'");
                    pw.println("");
                    pw.println("");
                    pw.println("hive_table_name = job_name");
                    pw.println("");
                    pw.println("default_args = {");
                    pw.println("    'owner': job_owner,");
                    pw.println("    'depends_on_past': False,");
                    pw.println("    'start_date': default_start_date,");
                    pw.println("    'email': default_email_to,");
                    pw.println("    'email_on_failure': False,");
                    pw.println("    'email_on_retry': False,");
                    pw.println("    'retries': default_job_retries,");
                    pw.println("    'retry_delay': default_job_retry_delay,");
                    pw.println("    'pool': default_pool,");
                    pw.println("    'priority_weight': 10");
                    pw.println("}");
                    pw.println("");

                }

            }
            public static void main(String[] args) throws Exception {
                readDbPropertiesFile("MysqlToHive.properties");
                readTableFormatted();
                buildExecuteHiveSql();
                printAirflowJob();
            }

        }

三:执行样例

1.mysql样例:
        CREATE TABLE mysql_column_test (
            bigint_test bigint(10),
            int_test int(10),
            smallint_test smallint(10),
            binary_test binary(20),
            varbinary_test varbinary(20),
            decimal_test decimal(30,5),
            double_test double(30,5),
            float_test float(30,5),
            char_test char(40),
            varchar_test varchar(40),
            mediumtext_test mediumtext,
            date_test date,
            datetime_test datetime,
            time_test time,
            timestamp_test timestamp DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

        insert into mysql_column_test(bigint_test,int_test,smallint_test,binary_test,varbinary_test,decimal_test,double_test,float_test,char_test,varchar_test,mediumtext_test,
 date_test,datetime_test,time_test) values(1,2,3,UNHEX('4'),UNHEX('5'),6.1,7.1,8.1,9.1,'10','11','2022-01-01','2020-09-14 23:18:17',CURRENT_TIMESTAMP);
2.代码执行:直接复制代码过去,新建文件,执行
        javac -cp mysql-connector-java-8.0.18.jar MysqlToHive.java
        java -classpath mysql-connector-java-8.0.18.jar: MysqlToHive

标签:数仓,airflow,String,contains,hive,mysql,test,fieldAndType,public
来源: https://www.cnblogs.com/wuxiaolong4/p/16462217.html