数据库
首页 > 数据库> > 国产开源优秀新一代MPP数据库StarRocks入门之旅-数仓新利器(下)

国产开源优秀新一代MPP数据库StarRocks入门之旅-数仓新利器(下)

作者:互联网

数据导入

总览

概述

数据导入功能是将原始数据按照相应的模型进行清洗转换并加载到StarRocks中,方便查询使用。StarRocks提供了多种导入方式,用户可以根据数据量大小、导入频率等要求选择最适合自己业务需求的导入方式。

image-20220514105453558

基本原理

导入执行流程,一个导入作业主要分为5个阶段:

image-20220514105725886

同步和异步

使用场景

Stream load

image-20220514111645827

准备数据插入前面已创建好的user_info表

curl --location-trusted -u root -T /home/commons/data/user01.csv  -H "column_separator:," http://192.168.5.52:8030/api/test_db/user_info/_stream_load
# 也可以通过-H "label:labelname"设置标签名称

image-20220514120734052

image-20220514120813306

Broker Load

首先需要先部署好Broker,如果有特殊的 hdfs 配置,复制线上的 hdfs-site.xml 到 Broker的conf 目录下,将hdfs-site.xml分发到三台Broker节点的conf 目录, 重新启动Broker,这样就完成了前置环境条件

在HIve中创建broker_test,并且导入数据

CREATE EXTERNAL TABLE broker_test(
  `url` string,
  `event` int,
  `time` bigint) 
PARTITIONED by (
  `dt` string, 
  `hour` string) 
ROW FORMAT DELIMITED
STORED AS PARQUET 
TBLPROPERTIES('parquet.compression'='SNAPPY');

insert into broker_test PARTITION(dt='2022-05-18',hour='16')
values ('/goods',1,1652862952),
('/goods',2,1652862952),
('/orders',1,1652862953),
('/orders',2,1652862953);

执行查看结果,hive表已有数据

image-20220518173810759

在StarRocks中test_db数据库中也创建表broker_test

CREATE TABLE IF NOT EXISTS broker_test (
    url DATETIME ,
    event INT ,
    time BIGINT,
    dt varchar(20),
    hour varchar(20)
)
DUPLICATE KEY(url)
DISTRIBUTED BY HASH(url) BUCKETS 8;

StarRocks中test_db数据库也创建broker_test_label1

LOAD LABEL broker_test_label1
(
    DATA INFILE("hdfs://nameservice1/user/hive/warehouse/test.db/broker_test/dt=2022-05-18/hour=16/*")
    INTO TABLE broker_test
    FORMAT AS "parquet"
    (url,EVENT,TIME)
    SET
    (
        url=url,
        EVENT=EVENT,
        TIME=TIME,
        dt='2022-05-18',
        HOUR='16'
    )
)
WITH BROKER 'broker1'
PROPERTIES
(
    "timeout" = "3600"
);

Broker Load导入是异步的,用户可以在SHOW LOAD命令中指定Label来查询对应导入作业的执行状态,查询结果Job已经完成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1sfUd01g-1652972070699)(http://www.itxiaoshen.com:3001/assets/1652866964727KZ3cc7Bz.png)]

查看StarRocks的test_db数据库的broker_test,4条数据已经完整导入了

image-20220518174322010

Routine Load

从一个本地 Kafka 集群导入 CSV 数据,创建ROUTINE LOAD的任务

CREATE TABLE IF NOT EXISTS site_test (
    site_id INT,
    event_type INT
)
DUPLICATE KEY(site_id)
DISTRIBUTED BY HASH(site_id) BUCKETS 8;

CREATE ROUTINE LOAD routine_load_site_test ON site_test
COLUMNS TERMINATED BY ",",
COLUMNS (site_id, event_type)
PROPERTIES
(
  "desired_concurrent_number" = "3",
  "max_batch_interval" = "5000",
  "max_error_number" = "1000"
)
FROM KAFKA
(
  "kafka_broker_list" = "192.168.12.27:9092,192.168.12.28:9092,192.168.12.29:9092",
  "kafka_topic" = "site_test",
  "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

image-20220514233143962

使用kafka生产者或者程序往site_test的Topic发送数据,也支持Json格式,配置属性增加json配置。

CREATE ROUTINE LOAD routine_load_site_test ON site_test
COLUMNS TERMINATED BY ",",
COLUMNS (site_id, event_type)
PROPERTIES (  
    "format"="json",  
    "json_root"="$.data",
    "desired_concurrent_number"="1",  
    "strip_outer_array" ="true",    
    "max_error_number"="1000" 
) 
FROM KAFKA (     
    "kafka_broker_list"= "192.168.12.27:9092,192.168.12.28:9092,192.168.12.29:9092",     
    "kafka_topic" = "site_test" 
);

StarRocks 提供 flink-connector-starrocks,导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳。 flink-connector-starrocks 的内部实现是通过缓存并批量由 stream load 导入。

我们使用上一篇创建好的article表来插入测试,创建flink-demo工程,Pom文件引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itxs</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.4</flink.version>
        <scala.version>2.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.1_flink-1.14_2.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>1.13.6</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.version}</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
</project>

创建DataStream API示例StarRocksDataStreamDemo.java

package starrocks;

import com.google.gson.Gson;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class StarRocksDataStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StarRocksDataStreamDemo starRocksDataStreamDemo = new StarRocksDataStreamDemo();
        starRocksDataStreamDemo.useStream(env);
        env.execute();
    }

    public void useStream(StreamExecutionEnvironment env){
        Gson gson = new Gson();
        DataStreamSource<String> ds = env.fromCollection(Arrays.asList(gson.toJson(new Article(1, 1, 1, "flink")),
                gson.toJson(new Article(2, 2, 2, "java")),
                gson.toJson(new Article(3, 3, 3, "starrocks"))));
        ds.addSink(
                StarRocksSink.sink(
                        // the sink options
                        StarRocksSinkOptions.builder()
                                .withProperty("jdbc-url", "jdbc:mysql://192.168.5.52:9030")
                                .withProperty("load-url", "192.168.5.52:8030")
                                .withProperty("username", "root")
                                .withProperty("password", "")
                                .withProperty("table-name", "article")
                                .withProperty("database-name", "test_db")
                                .withProperty("sink.properties.format", "json")
                                .withProperty("sink.properties.strip_outer_array", "true")
                                .build()
                )
        );
    }
}

本地运行main方法前在启动配置中勾上Include dependencies with "Provided" scope

image-20220519142819722

查看三条数据已成功导入article表里

image-20220519133532174

StarRocks中先创建表article_table

CREATE TABLE IF NOT EXISTS article_table (
    id INT,
    type INT,
    level INT,
    title VARCHAR(100)
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 8;

image-20220519142054797

创建Article pojo类

package starrocks;

public class Article {
    private int id;
    private int type;
    private int level;
    private String title;

    public Article(int id, int type, int level, String title) {
        this.id = id;
        this.type = type;
        this.level = level;
        this.title = title;
    }
}

创建StarRocksTableDemo.java

package starrocks;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class StarRocksTableDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);

        StarRocksTableDemo starRocksDataStreamDemo = new StarRocksTableDemo();
        starRocksDataStreamDemo.useSql(streamTableEnvironment);
    }

    public void useSql(StreamTableEnvironment env){
        env.executeSql(
                "CREATE TABLE IF NOT EXISTS article_table(" +
                        "id INT," +
                        "type INT," +
                        "level INT," +
                        "title VARCHAR" +
                        ") WITH ( " +
                        "'connector' = 'starrocks'," +
                        "'jdbc-url'='jdbc:mysql://192.168.5.52:9030'," +
                        "'load-url'='192.168.5.52:8030'," +
                        "'database-name' = 'test_db'," +
                        "'table-name' = 'article_table'," +
                        "'username' = 'root'," +
                        "'password' = ''," +
                        "'sink.buffer-flush.max-rows' = '1000000'," +
                        "'sink.buffer-flush.max-bytes' = '300000000'," +
                        "'sink.buffer-flush.interval-ms' = '2000'," +
                        "'sink.properties.format' = 'json'," +
                        "'sink.properties.strip_outer_array' = 'true'" +
                        ")"
        );
        StreamStatementSet statementSet = env.createStatementSet();
        statementSet.addInsertSql("insert into article_table values(4,4,4,'python'),"+
                "(5,5,5,'ruby')");
        statementSet.execute();
    }
}

记得和前面一样设置启动配置勾选然后本地运行main,正常执行完毕

image-20220519142315697

查看StarRocks的article_table表数据,数据已成功插入表中

image-20220519142422891

数组的使用

数组,作为数据库的一种扩展类型,在 PG、ClickHouse、Snowflake 等系统中都有相关特性支持,可以广泛的应用于 A/B Test 对比、用户标签分析、人群画像等场景。StarRocks 当前支持了 多维数组嵌套、数组切片、比较、过滤等特性。

数组列的定义形式为 ARRAY,其中 TYPE 是数组元素类型,默认 nullable,暂时不支持指定元素类型为 NOT NULL,但是也可以定义数组本身为 NOT NULL。数组类型有以下限制:

-- 一维数组
create table array_test(
  f0 INT,
  f1 ARRAY<INT>
)
duplicate key(f0)
distributed by hash(f0) buckets 3;  -- 以分3个桶为例。

-- 定义嵌套数组
create table nest_array_test(
  f0 INT,
  f1 ARRAY<ARRAY<VARCHAR(10)>>
)
duplicate key(f0)
distributed by hash(f0) buckets 3;

使用SELECT语句构造数组

# 插入一维数组类型数据和访问数组数据
INSERT INTO array_test VALUES(1, [1,2,3]);
select f1[3] from array_test;

image-20220519151428542

# 插入嵌套数组数组类型数据和访问数组数据
INSERT INTO nest_array_test VALUES(1, [[1,2,3],[4,5,6],[7,8,9]]);
select f1[2][2] from nest_array_test;

image-20220519151724051

Colocate Join

概述

Colocate Join 功能,属于分布式系统实现 Join 数据分布的策略之一。 能够减少数据分布在多个节点引起的 Join 时的数据移动和网络传输,从而提高查询性能。 Colocate Join 使用 Colocation Group(CG)管理一组表 ,同一 CG 内的表 Colocation Group Schema(CGS)相同,即表对应的分桶副本具有一致的分桶键、副本数量和副本放置方式 。这样可以保证同一 CG 内,表的数据分布在相同一组 BE 节点上。当 Join 列为分桶键时,计算节点只需做本地 Join,因而可以减少数据在节点间的传输耗时,提高查询性能。 因此,Colocation Join,相对于其他 Join,例如 Shuffle Join 和 Broadcast Join,可以避免数据网络传输开销,提高查询性能。

使用

建表时,可以在 PROPERTIES 中指定属性 "colocate_with" = "group_name",表示这个表是一个 Colocate Join 表,并且归属于一个指定的 Colocation Group

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "group1"
);

为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证下列约束:

  1. 同一 CG 内的 Table 的分桶键的类型、数量和顺序完全一致,并且桶数一致,这样才能保证多张表的数据分片能够一一对应地进行分布控制。分桶键,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定一组列。分桶键决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Bucket Seq 中。同 CG 的 table 的分桶键的名字可以不相同,分桶列的定义在建表语句中的出现次序可以不一致,但是在 DISTRIBUTED BY HASH(col1, col2, ...) 的对应数据类型的顺序要完全一致。
  2. 同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个 Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。
  3. 同一个 CG 内所有表的分区键,分区数量可以不同。

同一个CG中的所有表的副本放置满足:

  1. CG中所有 Table 的 Bucket Seq 和 BE 节点的映射关系和 Parent Table 一致。
  2. Parent Table 中所有 Partition 的 Bucket Seq 和 BE 节点的映射关系和第一个 Partition 一致。
  3. Parent Table 第一个 Partition 的 Bucket Seq 和 BE 节点的映射关系利用原生的 Round Robin 算法决定。

CG内表的一致的数据分布定义和子表副本映射,能够保证分桶键取值相同的数据行一定在相同BE上,因此当分桶键做join列时,只需本地join即可。

示例

简单示例我们创建相同结构的两张表ctbl1和ctbl2

CREATE TABLE `ctbl1` (
    `f1` date NOT NULL COMMENT "",
    `f2` int(11) NOT NULL COMMENT "",
    `f3` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`f1`, `f2`)
DISTRIBUTED BY HASH(`f2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

CREATE TABLE `ctbl2` (
    `f1` date NOT NULL COMMENT "",
    `f2` int(11) NOT NULL COMMENT "",
    `f3` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`f1`, `f2`)
DISTRIBUTED BY HASH(`f2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

image-20220519155500069

EXPLAIN SELECT * FROM ctbl1 INNER JOIN ctbl2 ON (ctbl1.f2 = ctbl2.f2);

执行查询查看分析计划,Colocation Join 生效 Hash Join 节点会显示 colocate: true

外部表

StarRocks 支持以外部表的形式,接入其他数据源。外部表指的是保存在其他数据源中的数据表,而 StartRocks 只保存表对应的元数据,并直接向外部表所在数据源发起查询。目前 StarRocks 已支持的第三方数据源包括 MySQL、ElasticSearch、Hive、StarRocks、Apache Iceberg 和 Apache Hudi。对于 StarRocks 数据源,现阶段只支持 Insert 写入,不支持读取,对于其他数据源,现阶段只支持读取,还不支持写入

CREATE EXTERNAL RESOURCE "hive0"
PROPERTIES (
  "type" = "hive",
  "hive.metastore.uris" = "thrift://192.168.12.29:9083"
);

-- 查看 StarRocks 中创建的资源
SHOW RESOURCES;

-- 删除名为 hive0 的资源
DROP RESOURCE "hive0";

image-20220519155950528

执行hive表的创建和插入一条测试数据

CREATE TABLE external_test(
  `url` string,
  `event` int,
  `time` bigint) 
ROW FORMAT DELIMITED
STORED AS PARQUET 
TBLPROPERTIES('parquet.compression'='SNAPPY');

insert into external_test values('/user',1,1652862952);

执行结果如下

image-20220519160725725

StarRocks中创建hive的外部表external_test

-- 例子:创建 hive0 资源对应的 Hive 集群中 test 数据库下的 broker_test 表的外表
CREATE EXTERNAL TABLE `external_test` (
  `url` varchar(200),
  `event` int,
  `time` bigint
) ENGINE=HIVE
PROPERTIES (
  "resource" = "hive0",
  "database" = "test",
  "table" = "external_test"
);
# 查询外表数据
select count(*) from external_test;

其他还有很多种外部表,MySQL 外部表、ElasticSearch 外部表、StarRocks 外部表、Apache Iceberg 外表、Apache Hudi 外表,后续有时间再补充

**本人博客网站 **IT小神 www.itxiaoshen.com

标签:数仓,StarRocks,flink,导入,MPP,test,table,数据
来源: https://www.cnblogs.com/itxiaoshen/p/16290641.html