其他分享
首页 > 其他分享> > 从0开始学flink(2):flink能有哪些数据源

从0开始学flink(2):flink能有哪些数据源

作者:互联网

1.文件是数据源

 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
                .getExecutionEnvironment();
        //文件是数据源
        DataStreamSource<String> st= executionEnvironment.readTextFile("C:\\2.txt");
        st.print();
        executionEnvironment.execute();

 2.fromElements 读取元素

        DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee",1, 34, "sss");//报错
        DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee","123", "34", "sss");
        st1.print();
//元素必须为相同类型

3.读取集合

List<String> list=new ArrayList<>();
        list.add("qwee");
        list.add("123");
        DataStreamSource<String> st2= executionEnvironment.fromCollection(list);
        st2.print();

 4.读取kafka

  StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
                .getExecutionEnvironment();      
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "student-group2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        DataStreamSource<String> student = executionEnvironment.addSource(new FlinkKafkaConsumer<>(
                "TOPIC",
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();

5.自定义source

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;


public class MySource implements SourceFunction<String> {

    Boolean running=true;
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while (running) {
            Thread.sleep(1000);
            sourceContext.collect(String.valueOf(new Random().nextInt(99)));
        }
    }
    @Override
    public void cancel() {
        running=false;
    }
}

实现一个SourceFunction  每间隔1s发一个字符串类型的随机数字 sourceContext.collect(发送的数据)

 DataStreamSource<String> st3=executionEnvironment.addSource(new MySource());
 st3.print();

6.读取mq

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

7.读写hive表

Flink CDC(后面专门讲)

标签:executionEnvironment,哪些,数据源,flink,put,props,print,new,DataStreamSource
来源: https://blog.csdn.net/L253503945/article/details/118521216