其他分享
首页 > 其他分享> > Kafka手动提交offset到自定义存储介质代码实现

Kafka手动提交offset到自定义存储介质代码实现

作者:互联网

package com.aura.consumer;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.io.*;
import java.util.*;

/**
 * @author panghu
 * @description 手动保存offset到自定义存储介质
 * @create 2021-02-27-16:52
 */
public class ConsumerOffset {
    // 高速缓存,记录分区offset
    private static Map<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
    // 自定义介质,存储高速缓存中的信息
    private static File offsetFile = new File("D:\\data\\offset");

    public static void main(String[] args) throws IOException {
        // 创建KafkaConsumer实例
        Properties prop = new Properties();
        prop.load(ConsumerOffset.class.getClassLoader().
                getResourceAsStream("consumer1.properties"));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
        // 订阅消息
        consumer.subscribe(
                // topic
                Collections.singleton("first"),
                new ConsumerRebalanceListener() {
                    // 分区分配之前做的事情
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                        // 提交旧分区offset信息到自定义介质中
                        commit();
                    }

                    // 分区分配之后做的事情
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitons) {
                        // 将自定义介质中的分区offset信息加载到缓存
                        readOffset(partitons);
                        for (TopicPartition topicPartition : partitons) {
                            // 获取分区offset信息,判断是否是首次消费
                            Long offset = offsetMap.get(topicPartition);
                            if (offset == null) {
                                // 当前分区consumer消费位置
                                consumer.seek(topicPartition, 0);
                            } else {
                                consumer.seek(topicPartition, offset);
                            }
                        }
                    }
                });

        // 拉取数据
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(2000);
            // 需要原子绑定的位置
            {
                // 消费数据
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    System.out.println(record);
                    // 记录消费后的offset到缓存中
                    offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                            record.offset());

                }
                // 提交分区offset信息
                commit();
            }
        }


        // 关闭连接
    }

    /**
     * 将自定义介质中的offset信息读取到缓存中
     */
    private static void readOffset(Collection<TopicPartition> partitions) {
        // 从自定义介质中读取offset信息
        ObjectInputStream ois = null;
        Map<TopicPartition, Long> temp;
        try {
            ois = new ObjectInputStream(new FileInputStream(offsetFile));
            temp = (Map<TopicPartition, Long>) ois.readObject();
        } catch (Exception e) {
            temp = new HashMap<TopicPartition, Long>();
        } finally {
            try {
                if (ois != null) {
                    ois.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // 从全部的分区offset信息中,取出当前分区的offset信息,加载到缓存中
        for (TopicPartition partition : partitions) {
            offsetMap.put(partition, temp.get(partition));
        }

    }

    /**
     * 将缓存中的分区offset信息提交到自定义介质中
     */
    private static void commit() {
        // 1.先从文件中读取所有的旧的offset信息
        ObjectInputStream ois = null;
        Map<TopicPartition, Long> temp;
        try {
            ois = new ObjectInputStream(new FileInputStream(offsetFile));
            temp = (Map<TopicPartition, Long>) ois.readObject();
        } catch (Exception e) {
            temp = new HashMap<TopicPartition, Long>();
        } finally {
            try {
                if (ois != null) {
                    ois.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // 2.合并offset
        temp.putAll(offsetMap);
        // 3.写出新的offset
        ObjectOutputStream oos = null;
        try {
            oos = new ObjectOutputStream(new FileOutputStream(offsetFile));
            oos.writeObject(temp);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (oos != null) {
                try {
                    oos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

标签:temp,自定义,ois,存储介质,Kafka,offset,new,consumer
来源: https://blog.csdn.net/FlatTiger/article/details/114190199