练习 : Flink sink to ElasticSearch
作者:互联网
ElasticSearch
package test; import bean.Stu; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.*; public class SinkToElasticsearch { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Stu> source = env.addSource(new SourceFunction<Stu>() { private boolean running = true; @Override public void run(SourceContext<Stu> sourceContext) throws Exception { while (running) { for (int i = 0; i < 10; i++) { ArrayList<String> subs = new ArrayList<String>(Arrays.asList("语文", "数学", "英语", "化学", "物理", "生物")); List<String> names = Arrays.asList("张三", "李四", "王五", "赵六","田七"); int next = new Random().nextInt(15); int random = new Random().nextInt(101); sourceContext.collect(new Stu(names.get(next * random % 5), subs.get(next * random % 6), random)); Thread.sleep(1000); } Thread.sleep(20000); } } @Override public void cancel() { running = false; } }); //创建一个 hosts 列表 ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop106",9200)); FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("hadoop106") .build(); // elasticsearch Sink Function ElasticsearchSinkFunction<Stu> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Stu>() { @Override public void process(Stu stu, RuntimeContext ctx, RequestIndexer indexer) { HashMap<String, String> map = new HashMap<>(); map.put(stu.getName(), stu.getSub() + " " + stu.getScore()); IndexRequest request = Requests.indexRequest() .index("clicks") .type("type") .source(map); indexer.add(request); } }; //写入 es source.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build()); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
标签:flink,Flink,new,streaming,ElasticSearch,sink,org,apache,import 来源: https://www.cnblogs.com/chang09/p/16435950.html