编程语言
首页 > 编程语言> > java – 从Kafka多次读同一条消息

java – 从Kafka多次读同一条消息

作者:互联网

我使用Spring Kafka API来实现手动偏移管理的Kafka消费者:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

在这里,我希望消费者只有在someCondition成立时才提交偏移量.否则,消费者应该睡一段时间并再次阅读相同的消息.

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

使用当前配置,如果someCondition == false,则consumer不提交偏移量,但仍会读取下一条消息.如果没有执行Kafka确认,是否有办法让消费者重读消息?

解决方法:

您可以停止并重新启动容器,它将被重新发送.

随着即将发布的1.1版本,你可以seek to the required offset,它将被重新发布.

但是如果它们已被检索,你仍然会先看到后面的消息,所以你也必须丢弃这些消息.

second milestone具有该功能,我们预计它将在下周发布.

标签:kafka-consumer-api,java,spring,apache-kafka,spring-kafka
来源: https://codeday.me/bug/20190722/1502335.html