java – Consumer.endOffsets如何在Kafka中运行?
作者:互联网
假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,提交的偏移量和结束偏移量.与Kafka控制台消费者组脚本的工作方式类似,但适用于所有组.
就像是
单个消费者 – 不工作 – 不返回某些提供的主题分区的偏移量(例如,提供10个 – 返回5个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
多个消费者 – 工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者API?理想情况下,我想使用单个消费者.
如果您需要更多详细信息,请告诉我们.
解决方法:
我想你差不多了.首先收集您感兴趣的所有主题分区,然后发出consumer.endOffsets命令.
请记住,我没有尝试过运行它,但这样的事情应该有效:
run() {
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds) {
topicPartitions.addAll(getTopicPartitions(groupId));
}
consumer.endOffsets(topicPartitions);
}
标签:java,apache-kafka,kafka-consumer-api,spring-kafka 来源: https://codeday.me/bug/20190701/1346205.html