从0开始学flink(2):flink能有哪些数据源
作者:互联网
1.文件是数据源
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment();
//文件是数据源
DataStreamSource<String> st= executionEnvironment.readTextFile("C:\\2.txt");
st.print();
executionEnvironment.execute();
2.fromElements 读取元素
DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee",1, 34, "sss");//报错
DataStreamSource<Object> st1=executionEnvironment.fromElements("eeee","123", "34", "sss");
st1.print();
//元素必须为相同类型
3.读取集合
List<String> list=new ArrayList<>();
list.add("qwee");
list.add("123");
DataStreamSource<String> st2= executionEnvironment.fromCollection(list);
st2.print();
4.读取kafka
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "student-group2");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> student = executionEnvironment.addSource(new FlinkKafkaConsumer<>(
"TOPIC",
new SimpleStringSchema(),
props)).setParallelism(1);
student.print();
5.自定义source
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class MySource implements SourceFunction<String> {
Boolean running=true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (running) {
Thread.sleep(1000);
sourceContext.collect(String.valueOf(new Random().nextInt(99)));
}
}
@Override
public void cancel() {
running=false;
}
}
实现一个SourceFunction 每间隔1s发一个字符串类型的随机数字 sourceContext.collect(发送的数据)
DataStreamSource<String> st3=executionEnvironment.addSource(new MySource());
st3.print();
6.读取mq
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink
7.读写hive表
Flink CDC(后面专门讲)
标签:executionEnvironment,哪些,数据源,flink,put,props,print,new,DataStreamSource 来源: https://blog.csdn.net/L253503945/article/details/118521216