编程语言
首页 > 编程语言> > java – Kafka Streams错误 – 分区上的偏移提交失败,请求超时

java – Kafka Streams错误 – 分区上的偏移提交失败,请求超时

作者:互联网

我们使用Kafka Streams来消费,处理和生成消息,而在PROD环境中,我们遇到了多个主题的错误:

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] 
Offset commit failed on partition xxx-1 at offset 13920: 
The request timed out.[]

对于负载较小的主题,这些错误很少发生,但对于具有高负载(和峰值)的主题,每个主题每天会发生数十次错误.主题有多个分区(例如10个).似乎这个问题不影响数据的处理(尽管性能),因为抛出异常(甚至可能是同一偏移的多个错误),消费者以后重新读取消息并成功处理它.

由于PR,我看到此错误消息出现在kafka-clients版本1.0.0中,但在以前的kafka-clients版本中,对于相同的用例(消费者的Errors.REQUEST_TIMED_OUT)类似的消息(组{}的偏移提交失败:{用调试级别记录了).
对我来说,将日志级别更新为警告这种用例更合乎逻辑.

如何解决这个问题?可能是根本原因?也许改变消费者属性或分区设置可能有助于摆脱这样的问题.

我们使用以下实现来创建Kafka Streams:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);

我们的Kafka消费者设置:

bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000       # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor

kafka经纪人版本:kafka_2.11-0.11.0.2.
两个版本的Kafka Streams都发生错误:1.0.1和1.1.0.

解决方法:

看起来您遇到了Kafka集群的问题,并且Kafka消费者在尝试提交偏移时会超时.
您可以尝试为Kafka消费者增加与连接相关的配置

> request.timeout.ms(默认为305000ms)

The configuration controls the maximum amount of time the client will
wait for the response of a request

> connections.max.idle.ms(默认为540000ms)

Close idle connections after the number of milliseconds specified by
this config.

标签:apache-kafka-streams,java,apache-kafka
来源: https://codeday.me/bug/20191008/1870987.html