其他分享
首页 > 其他分享> > Kafka--Consumer消费者

Kafka--Consumer消费者

作者:互联网

转:https://www.cnblogs.com/dingwpmz/p/12185196.html

温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。

1、KafkaConsumer 概述

根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征:

2、KafkaConsume 使用示例

2.1 自动提交消费进度

public static void testConsumer1() {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072");
    props.setProperty("group.id", "C_ODS_ORDERCONSUME_01");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("TOPIC_ORDER"));
    while (true) {
        ConsumerRecords<String, String>  records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("消息消费中");
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

2.2 手动提交消费进度

public static void testConsumer2() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                // insertIntoDb(buffer);
                // 省略处理逻辑
                consumer.commitSync();
                buffer.clear();
            }
        }
    }

3、认识 Consumer 接口

要认识 Kafka 的消费者,个人认为最好的办法就是从它的类图着手,下面给出 Consumer 接口的类图。
在这里插入图片描述
接下来对起重点方法进行一个初步的介绍,从下篇文章开始将对其进行详细设计。

4、初始 KafkaConsumer

在这里插入图片描述
接下来笔者根据其构造函数,对一一介绍其核心属性的含义,为接下来讲解其核心方法打下基础。

标签:消费,消费者,TopicPartition,void,偏移量,Kafka,props,poll,Consumer
来源: https://www.cnblogs.com/jvStarBlog/p/12186763.html