编程语言
首页 > 编程语言> > java – Kafka突然重置消费者Offset

java – Kafka突然重置消费者Offset

作者:互联网

我正在和Kafka 0.8& zookeeper 3.3.5.实际上,我们有十几个主题,我们正在消费没有任何问题.

最近,我们开始提供并消费一个有奇怪行为的新主题.消耗的偏移突然重置.它尊重我们设置的auto.offset.reset策略(实际上是最小的),但我无法理解为什么该主题突然重置其偏移量.

我正在使用高级消费者.

以下是我发现的一些错误日志:
我们有一堆这个错误日志:

[2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23.1,port:9092 with correlation id 47 for 1 topic(s) Set(MyTopic) (kafka.cl
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer connection to 172.16.23.1:9092 unsuccessful (kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

每次发生此问题时,我都会看到WARN日志:

[2015-03-26 05:21:30,596] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)

然后真正的问题发生了:

[2015-03-26 05:21:47,551] INFO Connected to 172.16.23.5:9092 for producing (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO Disconnecting from 172.16.23.5:9092 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherManager-1427047649942] Added fetcher for partitions ArrayBuffer([[MyTopic,0], initOffset 45268422051 to br
oker id:5,host:172.16.23.5,port:9092] ) (kafka.consumer.ConsumerFetcherManager)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Starting  (kafka.consumer.Cons
umerFetcherThread)
[2015-03-26 05:21:50,388] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,490] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,591] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,692] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)

现在的问题是:
是否有人已经经历过这种行为?
当Kafka决定重置其偏移时,是否有人可以告诉我auto.offset.reset是最大还是最小?

谢谢.

解决方法:

发生了什么事情,你在一段时间内缓慢地解释你的话题.

Kafka的保留模型不是基于消费者是否获得数据,而是基于磁盘使用和/或周期.在某些时候,你来不及,你需要的下一条消息已经消失,由于kafka清理了数据,因此不再可用.因此,分区[MyTopic,0]的当前偏移量为45268422051超出范围;将偏移量重置为1948447612条消息.

然后,您的消费者再次应用您的重置策略来自我引导,在您的情况下最小.

当您拥有突发性工作流时,这是一个常见问题,有时会超出数据保留范围.它可能会消失,因为您提高了编译速度,或者增加了保留策略以便能够在突发中生存.

标签:java,apache-zookeeper,apache-kafka,kafka-consumer-api,producer-consumer
来源: https://codeday.me/bug/20190628/1317985.html