java – spring kafka thorws设置并发后的InstanceAlreadyExistsException异常> 1
作者:互联网
我正在使用spring-kafka,如果我没有设置ConcurrentKafkaListenerContainerFactory的并发性,一切正常,当我将它设置为大于1的数字时,我得到一个异常:
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=client-3
我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
特性:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2
解决方法:
不要添加自己的答案只是为了添加更多信息 – 而是编辑您的问题.
我已经打开了issue for this on github.目前不支持为每个线程设置不同的client.id.
作为解决方法,您可以为每个启动一个单独的KafkaMessageListenerContainer(这是ConcurrentMessageListenerContainer在内部执行的操作).
编辑
虽然不理想,但您可以省略client.id,kafka客户端将为每个生成一个(consumer-1,consumer-2等)
标签:java,apache-kafka,spring-kafka,stomp 来源: https://codeday.me/bug/20190608/1197180.html