Java-Kafka崩溃时,Kafka使用者挂起轮询
作者:互联网
我一直在研究Zookeeper和Kafka的基本设置,以学习如何使用它,但是我在与消费者打交道时遇到了麻烦.当Kafka不可用时,对poll()方法的调用将挂起,直到它重新联机.
卡夫卡版本:0.10.1.0
我的代码如下所示:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while (!stopped) {
// If by any reason Kafka is not available this call will hang
// until Kafka is back online.
records = consumer.poll(timeout);
for (ConsumerRecord<String, byte[]> record : records) {
process(record);
}
Thread.sleep(sleepTime);
}
我已经读过,当我调用poll()时,消费者将尝试无限期地连接到Kafka,直到它重新联机或直到Consumer.wakeup()被调用为止.
当Kafka不在线时,我希望代码的行为有所不同.从不存在的卡夫卡中轮询时,是否有任何方法可以限制用户重试或使其失败?
解决方法:
不幸的是,这仍然是一个问题.许多消费者方法可以挂在各种情况下.
正在进行中的Kafka改进提案KIP-266,用于向Consumer方法添加超时以避免挂起.
据我所知,从另一个线程调用wakeup()是最好的解决方法
编辑:从Kafka 2.0.0开始,所有消费者呼叫都可以接受超时.这样可以在经纪人破产的情况下恢复控制.
标签:kafka-consumer-api,apache-kafka,java 来源: https://codeday.me/bug/20191025/1926671.html