FLINK基础(139):DS流与表转换(5) Handling of (Insert-Only) Streams(4)toDataStream
作者:互联网
The following code shows how to use toDataStream
for different scenarios.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; import java.time.Instant; // POJO with mutable fields // since no fully assigning constructor is defined, the field order // is alphabetical [event_time, name, score] public static class User { public String name; public Integer score; public Instant event_time; } tableEnv.executeSql( "CREATE TABLE GeneratedTable " + "(" + " name STRING," + " score INT," + " event_time TIMESTAMP_LTZ(3)," + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + ")" + "WITH ('connector'='datagen')"); Table table = tableEnv.from("GeneratedTable"); // === EXAMPLE 1 === // use the default conversion to instances of Row // since `event_time` is a single rowtime attribute, it is inserted into the DataStream // metadata and watermarks are propagated DataStream<Row> dataStream = tableEnv.toDataStream(table); // === EXAMPLE 2 === // a data type is extracted from class `User`, // the planner reorders fields and inserts implicit casts where possible to convert internal // data structures to the desired structured type // since `event_time` is a single rowtime attribute, it is inserted into the DataStream // metadata and watermarks are propagated DataStream<User> dataStream = tableEnv.toDataStream(table, User.class); // data types can be extracted reflectively as above or explicitly defined DataStream<User> dataStream = tableEnv.toDataStream( table, DataTypes.STRUCTURED( User.class, DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.INT()), DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
Note that only non-updating tables are supported by toDataStream
. Usually, time-based operations such as windows, interval joins, or the MATCH_RECOGNIZE
clause are a good fit for insert-only pipelines next to simple operations like projections and filters.
标签:toDataStream,Insert,DataStream,Handling,FLINK,time,table,DataTypes,event 来源: https://www.cnblogs.com/qiu-hua/p/15204075.html