java – 从Kafka多次读同一条消息
作者:互联网
我使用Spring Kafka API来实现手动偏移管理的Kafka消费者:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在这里,我希望消费者只有在someCondition成立时才提交偏移量.否则,消费者应该睡一段时间并再次阅读相同的消息.
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
使用当前配置,如果someCondition == false,则consumer不提交偏移量,但仍会读取下一条消息.如果没有执行Kafka确认,是否有办法让消费者重读消息?
解决方法:
您可以停止并重新启动容器,它将被重新发送.
随着即将发布的1.1版本,你可以seek to the required offset,它将被重新发布.
但是如果它们已被检索,你仍然会先看到后面的消息,所以你也必须丢弃这些消息.
second milestone具有该功能,我们预计它将在下周发布.
标签:kafka-consumer-api,java,spring,apache-kafka,spring-kafka 来源: https://codeday.me/bug/20190722/1502335.html