java – Kafka Streams:如何更改记录时间戳(0.11.0)?
作者:互联网
我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1.
因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点.
我真正感兴趣的时间戳是由流利的信息发送的:
“timestamp”:”1507885936″,”host”:”V.X.Y.Z.”
卡夫卡的记录表示:
offset = 0, timestamp= – 1, key = null, value = {“timestamp”:”1507885936″,”host”:”V.X.Y.Z.”}
我想在卡夫卡有这样的记录:
offset = 0, timestamp= 1507885936, key = null, value = {“timestamp”:”1507885936″,”host”:”V.X.Y.Z.”}
我的解决方法看起来像:
– 写一个消费者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
>编写生产者以生成设置了时间戳的新记录(ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value)
我更喜欢KafkaStreams解决方案,如果有的话.
解决方法:
您可以编写一个非常简单的Kafka Streams应用程序,例如:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用自定义TimestampExtractor配置应用程序,从记录中提取时间戳并将其返回.
在将记录写回Kafka时,Kafka Streams将使用返回的时间戳.
Note: if you have out of order data — ie, timestamps are not strictly ordered — the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records — this hold for version 1.0 but might change in future releases.).
标签:java,kafka-consumer-api,kafka-producer-api,apache-kafka-streams 来源: https://codeday.me/bug/20190527/1163166.html