Kafka手动提交offset到自定义存储介质代码实现
作者:互联网
package com.aura.consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.*;
import java.util.*;
/**
* @author panghu
* @description 手动保存offset到自定义存储介质
* @create 2021-02-27-16:52
*/
public class ConsumerOffset {
// 高速缓存,记录分区offset
private static Map<TopicPartition, Long> offsetMap = new HashMap<TopicPartition, Long>();
// 自定义介质,存储高速缓存中的信息
private static File offsetFile = new File("D:\\data\\offset");
public static void main(String[] args) throws IOException {
// 创建KafkaConsumer实例
Properties prop = new Properties();
prop.load(ConsumerOffset.class.getClassLoader().
getResourceAsStream("consumer1.properties"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
// 订阅消息
consumer.subscribe(
// topic
Collections.singleton("first"),
new ConsumerRebalanceListener() {
// 分区分配之前做的事情
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// 提交旧分区offset信息到自定义介质中
commit();
}
// 分区分配之后做的事情
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitons) {
// 将自定义介质中的分区offset信息加载到缓存
readOffset(partitons);
for (TopicPartition topicPartition : partitons) {
// 获取分区offset信息,判断是否是首次消费
Long offset = offsetMap.get(topicPartition);
if (offset == null) {
// 当前分区consumer消费位置
consumer.seek(topicPartition, 0);
} else {
consumer.seek(topicPartition, offset);
}
}
}
});
// 拉取数据
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(2000);
// 需要原子绑定的位置
{
// 消费数据
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(record);
// 记录消费后的offset到缓存中
offsetMap.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
}
// 提交分区offset信息
commit();
}
}
// 关闭连接
}
/**
* 将自定义介质中的offset信息读取到缓存中
*/
private static void readOffset(Collection<TopicPartition> partitions) {
// 从自定义介质中读取offset信息
ObjectInputStream ois = null;
Map<TopicPartition, Long> temp;
try {
ois = new ObjectInputStream(new FileInputStream(offsetFile));
temp = (Map<TopicPartition, Long>) ois.readObject();
} catch (Exception e) {
temp = new HashMap<TopicPartition, Long>();
} finally {
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 从全部的分区offset信息中,取出当前分区的offset信息,加载到缓存中
for (TopicPartition partition : partitions) {
offsetMap.put(partition, temp.get(partition));
}
}
/**
* 将缓存中的分区offset信息提交到自定义介质中
*/
private static void commit() {
// 1.先从文件中读取所有的旧的offset信息
ObjectInputStream ois = null;
Map<TopicPartition, Long> temp;
try {
ois = new ObjectInputStream(new FileInputStream(offsetFile));
temp = (Map<TopicPartition, Long>) ois.readObject();
} catch (Exception e) {
temp = new HashMap<TopicPartition, Long>();
} finally {
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 2.合并offset
temp.putAll(offsetMap);
// 3.写出新的offset
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new FileOutputStream(offsetFile));
oos.writeObject(temp);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (oos != null) {
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
标签:temp,自定义,ois,存储介质,Kafka,offset,new,consumer 来源: https://blog.csdn.net/FlatTiger/article/details/114190199