Flink 消费 Kafka 数据后在写回 Kafka 示例
作者:互联网
今天介绍一下 Flink从kafka 读取数据后,再将数据写回 kafka 的一个案例
示例代码
/** * 从一个 topic 读取数据,在写回另一个 topic */ public class SinkToKafka0824 { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //测试环境设置并行度1,生产环境设置 topic 分区数 env.setParallelism(1); //2、读取数据 //2.1、kafka 配置西信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop103:9092"); properties.setProperty("group.id", "consumer.group"); //2.2、读取 kafka 数据 DataStreamSource<String> lhcSDtream = env.addSource(new FlinkKafkaConsumer<String>("lhc", new SimpleStringSchema(), properties)); //2.3、包装读取到的数据 SingleOutputStreamOperator<String> map = lhcSDtream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { String[] split = value.split(","); return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString(); } }); //3、写回 kafka map.addSink(new FlinkKafkaProducer<String>("hadoop103:9092", "tbg", new SimpleStringSchema())); //打印测试 map.print(); //执行任务 env.execute(); } }
测试
启动生产者发生消息
[hui@hadoop103 ~]$ kafka-console-producer.sh --bootstrap-server hadoop103:9092 --topic lhc >令狐冲,./home,1000 >任盈盈,./pro?id=1001,1000 >令狐冲,./cart,8000
启动一个消费者观察
[hui@hadoop103 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic tbg Event{user='令狐冲', url='./home', timestamp=1970-01-01 08:00:01.0} Event{user='任盈盈', url='./pro?id=1001', timestamp=1970-01-01 08:00:01.0} Event{user='令狐冲', url='./cart', timestamp=1970-01-01 08:00:08.0}
标签:01,示例,Flink,kafka,topic,split,new,hadoop103,Kafka 来源: https://www.cnblogs.com/wdh01/p/16620337.html