数据库
首页 > 数据库> > FlinkSQL实践 -- 时态表/版本表

FlinkSQL实践 -- 时态表/版本表

作者:互联网

1. 背景

在FlinkSQL关联时,必然会涉及到维表,维表又可能是不断变化的(aka 时态表 或 版本表)。

版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

2. FlinkSQL中时态表类型

2.1 Debezium

未实战测试

2.2 a compacted Kafka topic

将某一个主题设置为compacted topic

bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
        tableEnv.executeSql(
                "CREATE TABLE dim_source (" +
                        "               id STRING," +
                        "               name STRING," +
                        "               update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
                        "               WATERMARK FOR update_time AS update_time, " +
                        "               PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (" +
                        " 'connector' = 'upsert-kafka'," +
                        " 'topic' = 'flinksqldim'," +
                        " 'properties.bootstrap.servers' = 'ip:port'," +
                        " 'properties.group.id' = 'flinksqlDim'," +
                        " 'key.format' = 'json'," +
                        " 'value.format' = 'json')"
        );

对应的kafka topic生产者代码

        // 创建消息
        // DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn");
        for (int i = 1; i < 5; i++) {
            JSONObject json1 = new JSONObject();
            json1.put("key", i+"");
            //json.put("update_time", dtf.format(LocalDateTime.now()));
            JSONObject json = new JSONObject();
            json.put("id", i+"");
            json.put("name", "name222"+i);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "flinksqldim",
                    json1.toJSONString(),
                    json.toJSONString()
            );
            // 发送消息
            Future<RecordMetadata> future = producer.send(record);
        }

3. 遇到的问题

3.1 kafka支持的METADATA

kafka支持的METADATA类型

3.2 key.format

必须指定key.format,可以不指定'scan.startup.mode' = 'earliest-offset'

4. 引用

Flink-1.12 - 之Flink SQL 与 kafka connector实践
Flink动态表和时态表总结
基于 FLINK SQL 的实时数据打宽的三种方式
将某一个主题设置为compacted topic
kafka支持的METADATA类型

标签:时态,format,--,FlinkSQL,kafka,topic,json
来源: https://www.cnblogs.com/route/p/15840409.html