其他分享
首页 > 其他分享> > flink-doris-connector flink1.13.1

flink-doris-connector flink1.13.1

作者:互联网

 doris 官文: https://doris.apache.org/ecosystem/flink-doris-connector.html#how-to-use

  依赖

     <!--flink-doris-connector-->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <!--<artifactId>flink-doris-connector-1.14_2.12</artifactId>-->
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <!--<artifactId>flink-doris-connector-1.12_2.12</artifactId>-->
            <!--<artifactId>flink-doris-connector-1.11_2.12</artifactId>-->
            <version>1.0.3</version>
        </dependency>

 

source :API

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.put("fenodes","192.168.18.51:8030");
        properties.put("username","root");
        properties.put("password","root");
        properties.put("table.identifier","test.top");

        DataStreamSource<List<?>> listDataStreamSource = env.addSource(new DorisSourceFunction(
                        new DorisStreamOptions(properties),
                        new SimpleListDeserializationSchema()
                )
        );
        listDataStreamSource.print();
        env.execute();
    }

SQL:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE flink_doris (\n" +
                "    siteid INT,\n" +
                "    citycode SMALLINT,\n" +
                "    username STRING,\n" +
                "    pv BIGINT\n" +
                "    ) \n" +
                "    WITH (\n" +
                "      'connector' = 'doris',\n" +
                "      'fenodes' = 'hadoop1:8030',\n" +
                "      'table.identifier' = 'test_db.table1',\n" +
                "      'username' = 'test',\n" +
                "      'password' = 'test'\n" +
                ")\n");
  tableEnv.executeSql("select * from flink_doris").print();

 

sink:

API:    JsonSink

 

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env.fromElements("{\"siteid\": \"66\", \"citycode\": \"6\", \"username\": \"pengyuyan\",\"pv\": \"6\"}")
                .addSink(
                        DorisSink.sink(
                                DorisReadOptions.builder().build(),
                                DorisExecutionOptions.builder()
                                        .setBatchSize(3)
                                        .setBatchIntervalMs(0L)
                                        .setMaxRetries(3)
                                        .setStreamLoadProp(pro).build(),
                                DorisOptions.builder()
                                        .setFenodes("hadoop1:8030")
                                        .setTableIdentifier("test_db.table1")
                                        .setUsername("test")
                                        .setPassword("test").build()
                        ));
//            .addSink(
//                DorisSink.sink(
//                        DorisOptions.builder()
//                                .setFenodes("hadoop1:8030")
//                                .setTableIdentifier("test_db.table1")
//                                .setUsername("test")
//                                .setPassword("test").build()
//                ));
        env.execute();
    }

 

RowData

 

 

public class DataStreamRowDataSinkDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStream<RowData> source = env.fromElements("")
                .map(new MapFunction<String, RowData>() {
                    @Override
                    public RowData map(String value) throws Exception {
                        GenericRowData genericRowData = new GenericRowData(4);
                        genericRowData.setField(0, 88);
                        genericRowData.setField(1, new Short("8"));
                        genericRowData.setField(2, StringData.fromString("flink-stream"));
                        genericRowData.setField(3, 8L);
                        return genericRowData;
                    }
                });
        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(32), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};
        source.addSink(
                DorisSink.sink(
                        fields,
                        types,
                        DorisReadOptions.builder().build(),
                        DorisExecutionOptions.builder()
                                .setBatchSize(3)
                                .setBatchIntervalMs(0L)
                                .setMaxRetries(3)
                                .build(),
                        DorisOptions.builder()
                                .setFenodes("hadoop1:8030")
                                .setTableIdentifier("test_db.table1")
                                .setUsername("test")
                                .setPassword("test").build()
                ));
        env.execute();
    }
}

 

标签:StreamExecutionEnvironment,flink,connector,doris,env,test,new,flink1.13
来源: https://www.cnblogs.com/lshan/p/16379350.html