编程语言
首页 > 编程语言> > Java-Kafka崩溃时,Kafka使用者挂起轮询

Java-Kafka崩溃时,Kafka使用者挂起轮询

作者:互联网

我一直在研究Zookeeper和Kafka的基本设置,以学习如何使用它,但是我在与消费者打交道时遇到了麻烦.当Kafka不可用时,对poll()方法的调用将挂起,直到它重新联机.

卡夫卡版本:0.10.1.0

我的代码如下所示:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);

while (!stopped) {
    // If by any reason Kafka is not available this call will hang
    // until Kafka is back online.
    records = consumer.poll(timeout);

    for (ConsumerRecord<String, byte[]> record : records) {
        process(record);
    }

    Thread.sleep(sleepTime);
}

我已经读过,当我调用poll()时,消费者将尝试无限期地连接到Kafka,直到它重新联机或直到Consumer.wakeup()被调用为止.

当Kafka不在线时,我希望代码的行为有所不同.从不存在的卡夫卡中轮询时,是否有任何方法可以限制用户重试或使其失败?

解决方法:

不幸的是,这仍然是一个问题.许多消费者方法可以挂在各种情况下.

正在进行中的Kafka改进提案KIP-266,用于向Consumer方法添加超时以避免挂起.

据我所知,从另一个线程调用wakeup()是最好的解决方法

编辑:从Kafka 2.0.0开始,所有消费者呼叫都可以接受超时.这样可以在经纪人破产的情况下恢复控制.

标签:kafka-consumer-api,apache-kafka,java
来源: https://codeday.me/bug/20191025/1926671.html