其他分享
首页 > 其他分享> > kafka消费者全部数据

kafka消费者全部数据

作者:互联网

1.目的

  每次消费数据时从最开始消费,主要是做数据预览。

2.设置offset.reset

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "demo");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());

return new KafkaConsumer<>(properties);

如果只设置AUTO_OFFSET_RESET_CONFIG是无效的,必须加上AUTO_OFFSET_RESET_CONFIG

3.每次消费完重新seek offset到开始位置

  由于seek是针对offset,所以必须是消费过数据后才能执行seek。

  consumer.beginningOffsets(consumer.listTopics().get("demo").stream().map(o->new TopicPartition(o.topic(),o.partition())).collect(Collectors.toList()));

标签:ConsumerConfig,RESET,消费者,AUTO,kafka,全部,CONFIG,properties
来源: https://www.cnblogs.com/yangyang12138/p/13796103.html