编程语言
首页 > 编程语言> > java – Kafka模式订阅.新主题没有触发重新平衡

java – Kafka模式订阅.新主题没有触发重新平衡

作者:互联网

根据kafka javadocs的文件,如果我:

>订阅模式
>创建与模式匹配的主题

应该发生重新平衡,这使消费者从该新主题中读取.但那并没有发生.

如果我停止并启动消费者,它确实会选择新主题.所以我知道新主题与模式匹配.这个问题可能在https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics中有重复,但这个问题无处可去.

我看到kafka日志并没有错误,它只是不会触发重新平衡.当消费者加入或死亡时触发重新平衡,但是在创建新主题时不会触发(即使分区被添加到现有主题,但这是另一个主题).

我正在使用kafka 0.10.0.0和“新消费者API”的官方Java客户端,意思是代理GroupCoordinator而不是胖客户端zookeeper.

这是示例消费者的代码:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

}

在制作人中,我正在向名为mytopic1,mytopic2等的主题发送消息.

如果不触发重新平衡,模式几乎没用.

你知道为什么没有发生重新平衡吗?

解决方法:

文档提到“模式匹配将定期针对检查时存在的主题进行.”事实证明,“周期性”对应于metadata.max.age.ms属性.通过将该属性(在我的代码示例中的“consumer.props”内)设置为5000,我可以看到它每5秒检测一次新主题和分区.

根据这个jira票https://issues.apache.org/jira/browse/KAFKA-3854,这是按照设计的:

The final note on the JIRA stating that a later created topic that matches a consumer’s subscription pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat subscribe() to the same pattern would be needed to handle that case.

刷新元数据轮询执行故障单中提到的“重复订阅()”.

这是令人困惑的来自Kafka 0.8,其中真正的触发基于zookeper手表,而不是轮询.对于这种情况,IMO 0.9更多地是降级,而不是“及时”重新平衡,这变成了高频率轮询与开销,或者在对新主题/分区作出反应之前长时间进行低频率轮询.

标签:java,apache-kafka,kafka-consumer-api
来源: https://codeday.me/bug/20190519/1134701.html