数据库
首页 > 数据库> > Flink SQL Hbase Demo

Flink SQL Hbase Demo

作者:互联网

  1. 依赖pom

           <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
                <version>1.12.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-hbase-2.2_2.11</artifactId>
                <version>1.12.1</version>
            </dependency>
    <
     <build>
         <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>module-info.class</exclude>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <!--这块很重要,采用追加的方式-->
                                <transformers combine.children="append">
                                    <transformer
                                      implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.juneyaoair.dataplatform.service.RealTimeLableApplication</mainClass>
                                    </transformer>
                                    <!-- The service transformer is needed to merge META-INF/services files -->
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
    
        </build>
    
  2. Flink SQL Source 连接Kafka

    CREATE TABLE KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE (
      table_name STRING NOT NULL, 
      op_type STRING, 
      op_ts STRING, 
      op_ts_time as CAST(op_ts as timestamp), 
      current_ts STRING, 
      pos STRING, 
      before ROW < `ADDRESS_STATUS` STRING, 
      `AUDITOR_ID` STRING, 
      `BENEFIC_TYPE` STRING, 
      `CARD_STATUS_CODE` STRING, 
      `CHILD_SIGN` STRING, 
      `CLASS_PREFER` STRING, 
      `COMMENTS` STRING, 
      `DEGRADE_SIGN` STRING, 
      `EFFECTIVE_DATE` STRING, 
      `EXPIRE_DATE` STRING, 
      `ID` STRING, 
      `INVITER_CARD_NO` STRING, 
      `IS_ACCOUNT_CLOSED` STRING, 
      `IS_LEVEL_EXPIRE` STRING, 
      `IS_MILEAGE_EXPIRE` STRING, 
      `IS_SMALL_EXEMPT_PWD` STRING, 
      `IS_SMOKING` STRING, 
      `IS_TEST_MEMBER` STRING, 
      `MAIL_ADDRESS_TYPE` STRING, 
      `MAIL_LANGUAGE_CODE` STRING, 
      `MEAL_PREFER` STRING, 
      `MEMBER_LEVEL_CODE` STRING, 
      `MEMBER_STATUS_CODE` STRING, 
      `MEMBER_STATUS_DATE` STRING, 
      `MULTIPLIER_MILES_SIGN` STRING, 
      `OPERATE_DATE` STRING, 
      `OPERATE_USER_ID` STRING, 
      `QUALIFICATION_REASON_CODE` STRING, 
      `REDEEM_SIGN` STRING, 
      `REGIST_DATE` STRING, 
      `REGIST_SOURCE` STRING, 
      `SEAT_PREFER` STRING, 
      `SMS_STATUS` STRING, 
      `SPECIAL_ASSISTANCE` STRING, 
      `STATEMENT_SEND_SIGN` STRING, 
      `STATEMENT_SEND_TYPE` STRING, 
      `SUBMIT_DATE` STRING, 
      `UNITED_CARD_SIGN` STRING, 
      `UPDATE_DATE` STRING, 
      `UPDATE_STATUS_SIGN` STRING, 
      `UPDATE_USER_ID` STRING, 
      `UPGRADE_SIGN` STRING >, 
      after 
        ROW < `ADDRESS_STATUS` STRING, 
        `AUDITOR_ID` STRING, 
        `BENEFIC_TYPE` STRING, 
        `CARD_STATUS_CODE` STRING, 
        `CHILD_SIGN` STRING, 
        `CLASS_PREFER` STRING, 
        `COMMENTS` STRING, 
        `DEGRADE_SIGN` STRING, 
        `EFFECTIVE_DATE` STRING, 
        `EXPIRE_DATE` STRING, 
        `ID` STRING, 
        `INVITER_CARD_NO` STRING, 
        `IS_ACCOUNT_CLOSED` STRING, 
        `IS_LEVEL_EXPIRE` STRING, 
        `IS_MILEAGE_EXPIRE` STRING, 
        `IS_SMALL_EXEMPT_PWD` STRING, 
        `IS_SMOKING` STRING, 
        `IS_TEST_MEMBER` STRING, 
        `MAIL_ADDRESS_TYPE` STRING, 
        `MAIL_LANGUAGE_CODE` STRING, 
        `MEAL_PREFER` STRING, 
        `MEMBER_LEVEL_CODE` STRING, 
        `MEMBER_STATUS_CODE` STRING, 
        `MEMBER_STATUS_DATE` STRING, 
        `MULTIPLIER_MILES_SIGN` STRING, 
        `OPERATE_DATE` STRING, 
        `OPERATE_USER_ID` STRING, 
        `QUALIFICATION_REASON_CODE` STRING, 
        `REDEEM_SIGN` STRING, 
        `REGIST_DATE` STRING, 
        `REGIST_SOURCE` STRING, 
        `SEAT_PREFER` STRING, 
        `SMS_STATUS` STRING, 
        `SPECIAL_ASSISTANCE` STRING, 
        `STATEMENT_SEND_SIGN` STRING, 
        `STATEMENT_SEND_TYPE` STRING, 
        `SUBMIT_DATE` STRING, 
        `UNITED_CARD_SIGN` STRING, 
        `UPDATE_DATE` STRING, 
        `UPDATE_STATUS_SIGN` STRING, 
        `UPDATE_USER_ID` STRING, 
        `UPGRADE_SIGN` STRING >, 
        data_row AS case when op_type = 'D' then before else 
      after 
        end, 
        watermark for op_ts_time as op_ts_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka', 'topic' = 'SEM.FFPTEST.HOFFPDEV.TBL_MEMBER_INFO', 
      'properties.bootstrap.servers' = '172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092', 
      'scan.startup.mode' = 'earliest-offset', 
      'properties.group.id' = 'kafka-flink-sync-hbase-local', 
      'properties.fetch.max.bytes' = '5242880', 
      'format' = 'json'
    )
    
  3. Flink SQL Sink Hbase 需要提前在Hbase中建表

    CREATE TABLE SYNC_HBASE_SINK_TBL_MEMBER_INFO(
      rowkey STRING, 
      field ROW < `ADDRESS_STATUS` STRING, 
      `AUDITOR_ID` STRING, 
      `BENEFIC_TYPE` STRING, 
      `CARD_STATUS_CODE` STRING, 
      `CHILD_SIGN` STRING, 
      `CLASS_PREFER` STRING, 
      `COMMENTS` STRING, 
      `DEGRADE_SIGN` STRING, 
      `EFFECTIVE_DATE` STRING, 
      `EXPIRE_DATE` STRING, 
      `ID` STRING, 
      `INVITER_CARD_NO` STRING, 
      `IS_ACCOUNT_CLOSED` STRING, 
      `IS_LEVEL_EXPIRE` STRING, 
      `IS_MILEAGE_EXPIRE` STRING, 
      `IS_SMALL_EXEMPT_PWD` STRING, 
      `IS_SMOKING` STRING, 
      `IS_TEST_MEMBER` STRING, 
      `MAIL_ADDRESS_TYPE` STRING, 
      `MAIL_LANGUAGE_CODE` STRING, 
      `MEAL_PREFER` STRING, 
      `MEMBER_LEVEL_CODE` STRING, 
      `MEMBER_STATUS_CODE` STRING, 
      `MEMBER_STATUS_DATE` STRING, 
      `MULTIPLIER_MILES_SIGN` STRING, 
      `OPERATE_DATE` STRING, 
      `OPERATE_USER_ID` STRING, 
      `QUALIFICATION_REASON_CODE` STRING, 
      `REDEEM_SIGN` STRING, 
      `REGIST_DATE` STRING, 
      `REGIST_SOURCE` STRING, 
      `SEAT_PREFER` STRING, 
      `SMS_STATUS` STRING, 
      `SPECIAL_ASSISTANCE` STRING, 
      `STATEMENT_SEND_SIGN` STRING, 
      `STATEMENT_SEND_TYPE` STRING, 
      `SUBMIT_DATE` STRING, 
      `UNITED_CARD_SIGN` STRING, 
      `UPDATE_DATE` STRING, 
      `UPDATE_STATUS_SIGN` STRING, 
      `UPDATE_USER_ID` STRING, 
      `UPGRADE_SIGN` STRING >, 
      primary key (rowkey) NOT ENFORCED
    ) WITH (
      'connector' = 'hbase-2.2', 'table-name' = 'ods:tbl_member_info', 
      'zookeeper.quorum' = '172.22.31.53:2181'
    )
    
  4. Flink SQL Dml

    insert into SYNC_HBASE_SINK_TBL_MEMBER_INFO 
    select 
      ID AS rowkey, 
      ROW(
     `ADDRESS_STATUS`,
     `AUDITOR_ID`, 
     `BENEFIC_TYPE`, 
     `CARD_STATUS_CODE`, 
     `CHILD_SIGN`, 
     `CLASS_PREFER`, 
     `COMMENTS`, 
     `DEGRADE_SIGN`, 
     `EFFECTIVE_DATE`, 
     `EXPIRE_DATE`, 
     `ID`, 
     `INVITER_CARD_NO`, 
     `IS_ACCOUNT_CLOSED`, 
     `IS_LEVEL_EXPIRE`,
     `IS_MILEAGE_EXPIRE`, 
     `IS_SMALL_EXEMPT_PWD`, 
     `IS_SMOKING`, 
     `IS_TEST_MEMBER`, 
     `MAIL_ADDRESS_TYPE`, 
     `MAIL_LANGUAGE_CODE`, 
     `MEAL_PREFER`, 
     `MEMBER_LEVEL_CODE`, 
     `MEMBER_STATUS_CODE`, 
     `MEMBER_STATUS_DATE`, 
     `MULTIPLIER_MILES_SIGN`, 
     `OPERATE_DATE`, 
     `OPERATE_USER_ID`, 
     `QUALIFICATION_REASON_CODE`, 
     `REDEEM_SIGN`, 
     `REGIST_DATE`, 
     `REGIST_SOURCE`, 
     `SEAT_PREFER`, 
     `SMS_STATUS`, 
     `SPECIAL_ASSISTANCE`, 
     `STATEMENT_SEND_SIGN`, 
     `STATEMENT_SEND_TYPE`, 
     `SUBMIT_DATE`, 
     `UNITED_CARD_SIGN`, 
     `UPDATE_DATE`, 
     `UPDATE_STATUS_SIGN`, 
     `UPDATE_USER_ID`, 
     `UPGRADE_SIGN`
      ) 
    from 
      (
     select 
       data_row.`ADDRESS_STATUS` AS ADDRESS_STATUS, 
       data_row.`AUDITOR_ID` AS AUDITOR_ID, 
       data_row.`BENEFIC_TYPE` AS BENEFIC_TYPE, 
       data_row.`CARD_STATUS_CODE` AS CARD_STATUS_CODE, 
       data_row.`CHILD_SIGN` AS CHILD_SIGN, 
       data_row.`CLASS_PREFER` AS CLASS_PREFER, 
       data_row.`COMMENTS` AS COMMENTS, 
       data_row.`DEGRADE_SIGN` AS DEGRADE_SIGN, 
       data_row.`EFFECTIVE_DATE` AS EFFECTIVE_DATE, 
       data_row.`EXPIRE_DATE` AS EXPIRE_DATE, 
       data_row.`ID` AS ID, 
       data_row.`INVITER_CARD_NO` AS INVITER_CARD_NO, 
       data_row.`IS_ACCOUNT_CLOSED` AS IS_ACCOUNT_CLOSED, 
       data_row.`IS_LEVEL_EXPIRE` AS IS_LEVEL_EXPIRE, 
       data_row.`IS_MILEAGE_EXPIRE` AS IS_MILEAGE_EXPIRE, 
       data_row.`IS_SMALL_EXEMPT_PWD` AS IS_SMALL_EXEMPT_PWD, 
       data_row.`IS_SMOKING` AS IS_SMOKING, 
       data_row.`IS_TEST_MEMBER` AS IS_TEST_MEMBER, 
       data_row.`MAIL_ADDRESS_TYPE` AS MAIL_ADDRESS_TYPE, 
       data_row.`MAIL_LANGUAGE_CODE` AS MAIL_LANGUAGE_CODE, 
       data_row.`MEAL_PREFER` AS MEAL_PREFER, 
       data_row.`MEMBER_LEVEL_CODE` AS MEMBER_LEVEL_CODE, 
       data_row.`MEMBER_STATUS_CODE` AS MEMBER_STATUS_CODE, 
       data_row.`MEMBER_STATUS_DATE` AS MEMBER_STATUS_DATE, 
       data_row.`MULTIPLIER_MILES_SIGN` AS MULTIPLIER_MILES_SIGN, 
       data_row.`OPERATE_DATE` AS OPERATE_DATE, 
       data_row.`OPERATE_USER_ID` AS OPERATE_USER_ID, 
       data_row.`QUALIFICATION_REASON_CODE` AS QUALIFICATION_REASON_CODE, 
       data_row.`REDEEM_SIGN` AS REDEEM_SIGN, 
       data_row.`REGIST_DATE` AS REGIST_DATE, 
       data_row.`REGIST_SOURCE` AS REGIST_SOURCE, 
       data_row.`SEAT_PREFER` AS SEAT_PREFER, 
       data_row.`SMS_STATUS` AS SMS_STATUS, 
       data_row.`SPECIAL_ASSISTANCE` AS SPECIAL_ASSISTANCE, 
       data_row.`STATEMENT_SEND_SIGN` AS STATEMENT_SEND_SIGN, 
       data_row.`STATEMENT_SEND_TYPE` AS STATEMENT_SEND_TYPE, 
       data_row.`SUBMIT_DATE` AS SUBMIT_DATE, 
       data_row.`UNITED_CARD_SIGN` AS UNITED_CARD_SIGN, 
       data_row.`UPDATE_DATE` AS UPDATE_DATE, 
       data_row.`UPDATE_STATUS_SIGN` AS UPDATE_STATUS_SIGN, 
       data_row.`UPDATE_USER_ID` AS UPDATE_USER_ID, 
       data_row.`UPGRADE_SIGN` AS UPGRADE_SIGN 
     FROM 
       default_catalog.default_database.KAFKA_SOURCE_TBL_MEMBER_INFO_HBASE
      ) t
    

标签:STATUS,STRING,Demo,Flink,SIGN,DATE,Hbase,data,row
来源: https://www.cnblogs.com/qiangsky/p/16422151.html