编程语言
首页 > 编程语言> > java-Spring Kafka在一个使用者中使用多种消息类型

java-Spring Kafka在一个使用者中使用多种消息类型

作者:互联网

我有多个生产者,可以将多种类型的事件发送到一个kafka主题.

我有一个必须使用所有类型消息的使用者.对每种消息使用不同的逻辑.

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}

但是在这种情况下,不仅EventOne,所有消息都传给该方法

如果我实现了两种方法(对于每种消息类型),那么所有消息都只会变成一种方法.

如果我这样实现监听器:

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}

然后我得到异常:
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1]错误org.springframework.kafka.listener.LoggingErrorHandler-处理时出错:ConsumerRecord java.lang.IllegalArgumentException:无法识别类型:[null]

请告诉我如何实现多种类型的使用者?

解决方法:

我发现了灵魂,这很简单.
因此,从这个问题尚不清楚,但是我使用jsonMessageConverter
像这样:

    @Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

默认情况下,Jackson库不添加JSON的类信息,因此无法猜测我要反序列化的类型.
解决方案是注释

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")

将classinformation添加到json字符串.
在消费者方面,我只是编写事件的基类

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(BasicEvent event) {

    if (event instanceof EventOne) {
        logger.info("type={EventOne}, event={}", event);
    } else {
        logger.info("type={EventTwo}, event={}", event);
    }
}

希望此信息对某人有帮助.

标签:apache-kafka,spring,java
来源: https://codeday.me/bug/20191118/2026898.html