编程语言
首页 > 编程语言> > java – Consumer.endOffsets如何在Kafka中运行?

java – Consumer.endOffsets如何在Kafka中运行?

作者:互联网

假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,提交的偏移量和结束偏移量.与Kafka控制台消费者组脚本的工作方式类似,但适用于所有组.

就像是

单个消费者 – 不工作 – 不返回某些提供的主题分区的偏移量(例如,提供10个 – 返回5个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者 – 工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

版本:Kafka-Client 2.0.0

我是否错误地使用了消费者API?理想情况下,我想使用单个消费者.

如果您需要更多详细信息,请告诉我们.

解决方法:

我想你差不多了.首先收集您感兴趣的所有主题分区,然后发出consumer.endOffsets命令.

请记住,我没有尝试过运行它,但这样的事情应该有效:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}

标签:java,apache-kafka,kafka-consumer-api,spring-kafka
来源: https://codeday.me/bug/20190701/1346205.html