编程语言
首页 > 编程语言> > java – spring kafka thorws设置并发后的InstanceAlreadyExistsException异常> 1

java – spring kafka thorws设置并发后的InstanceAlreadyExistsException异常> 1

作者:互联网

我正在使用spring-kafka,如果我没有设置ConcurrentKafkaListenerContainerFactory的并发性,一切正常,当我将它设置为大于1的数字时,我得到一个异常:

javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=client-3

我的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory<String, String>();

    factory.setConcurrency(kafkaConfig.getConcurrency());

    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

特性:

kafka.enable-auto-commit=false

kafka.client-id=client-1

kafka.concurrency=2

解决方法:

不要添加自己的答案只是为了添加更多信息 – 而是编辑您的问题.

我已经打开了issue for this on github.目前不支持为每个线程设置不同的client.id.

作为解决方法,您可以为每个启动一个单独的KafkaMessageListenerContainer(这是ConcurrentMessageListenerContainer在内部执行的操作).

编辑

虽然不理想,但您可以省略client.id,kafka客户端将为每个生成一个(consumer-1,consumer-2等)

标签:java,apache-kafka,spring-kafka,stomp
来源: https://codeday.me/bug/20190608/1197180.html