java – 为什么kafka流会重新处理代理重启后生成的消息
作者:互联网
我有一个单节点kafka代理和简单的流应用程序.我创建了2个主题(topic1和topic2).
在topic1上生成 – 处理过的消息 – 写入topic2
注意:对于生成的每条消息,只有一条消息写入目标主题
我制作了一条消息.在写入topic2之后,我停止了kafka经纪人.一段时间后,我重新启动了代理并在topic1上生成了另一条消息.现在流应用程序处理了该消息3次.现在,在不停止代理的情况下,我向topic1发出了消息,并等待流应用程序在再次生成之前写入topic2.
Streams应用程序表现得很奇怪.有时,对于一条生成的消息,有2条消息写入目标主题,有时甚至是3.我不明白为什么会发生这种情况.我的意思是即使重新启动代理重启后产生的消息也是如此.
更新1:
我使用的是Kafka版本1.0.0和Kafka-Streams版本1.1.0
下面是代码.
Main.java
String credentials = env.get("CREDENTIALS");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");
AppUtil.java
public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {
KStream<String, String> activityResultStream = activityStream
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> log = new ArrayList<String>();
JSONObject received = new JSONObject(value);
String url = received.get("url").toString();
String accessToken = ServiceUtil.getAccessToken(credentials);
JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);
log.add(activityLog.toString());
}
return log;
}
});
return activityResultStream;
}
更新2:
在具有上述配置的单个代理和单个分区环境中,我启动了Kafka代理和流应用程序.在源主题上生成6条消息,当我在目标主题上启动消费者时,有36条消息并且正在计数.他们继续前进.
所以我跑来看看消费者群体:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
输出:
streams-collection-app-0
接下来我跑了这个:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0
输出:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 1 0 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer
过了一会儿输出显示:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 6 5 - - -
然后:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 7 6 - - -
解决方法:
似乎你面临着已知的限制.默认情况下,Kafka主题将消息存储至少7天,但保存的偏移量存储1天(默认配置值offsets.retention.minutes = 1440).因此,如果在超过1天内未对您的源主题生成任何消息,则在应用程序重新启动后,将再次重新处理来自主题的所有消息(实际上多次,取决于重新启动的次数,每个此类主题每天最多1次,以及罕见的传入消息).
你可以找到过期提交抵消How does an offset expire for consumer group的描述.
在kafka版本2.0中,承诺抵消的保留期增加了KIP-186: Increase offsets retention default to 7 days.
为了防止重新处理,您可以添加消费者属性auto.offset.reset:latest(默认值是最早的).
最新版本存在一个小风险:如果当天没有人在源主题中产生消息,那么在重新启动应用程序后,您可能会丢失一些消息(只有在重启期间准确到达的消息).
标签:apache-kafka-streams,java,apache-kafka 来源: https://codeday.me/bug/20190910/1799260.html