kafka工具类
作者:互联网
背景
在做数据开发时,经常预见要查看kafka 元数据的一些信息,比如有多少个topic、某个topic中有多少分区、创建topic、删除topic等等。
代码
废话不多,直接撸。
我们的maven版本为:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.7.0</version>
</dependency>
首先我们可以定义一个接口
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
public interface KafkaClientImp {
/**
* 创建kafka
*
* @param topicName topic名称
* @param partition 分区数 建议小于12个
* @param replication 副本数量
* @return 是否创建成功
*/
boolean createTopic(String topicName, Integer partition, short replication);
/**
* topic 是否存在
*
* @param topicName topic名称
* @return true 存在 false不存在
*/
boolean isTopic(String topicName);
/**
* 获取topic列表
*
* @return
*/
List<String> getTopicList();
/**
* 获取topic 的分区数
*
* @param consumer kafka消费者
* @return
*/
List<PartitionInfo> getPartitionNum(KafkaConsumer<String, String> consumer, Object topicName);
/**
* 删除topic
*
* @param topicName
* @return
*/
boolean deleteTopic(String topicName);
}
然后我们写一个KafkaUtils类来实现接口。
import com.meituan.kafka.metadata.KafkaClientImp;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* kafka工具类
*/
public class KafkaUtils implements KafkaClientImp {
private AdminClient adminClient;
public KafkaUtils(String filePath) {
HashMap<String, Object> map = new HashMap<>();
MyProperties myProperties = new MyProperties();
String bootstrapServers = myProperties.GetValueByKey(filePath, "bootstrapServers");
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
adminClient = KafkaAdminClient.create(map);
}
/**
* 创建topic
*
* @param topicName topic名称
* @param partition 分区数 建议小于12个
* @param replication 副本数量
* @return
*/
@Override
public boolean createTopic(String topicName, Integer partition, short replication) {
boolean flag = false;
try {
NewTopic newTopic = new NewTopic(topicName, /*topic名称,不建议过长*/
partition,/*分区数量*/
replication/*副本数量*/
);
adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000))
.all()
.get();
flag = true;
} catch (Exception e) {
if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) {
System.out.println("topic is exist !! " + e.getMessage());
} else {
e.printStackTrace();
}
}
return flag;
}
/**
* 判断topic是否存在
*
* @param topicName topic名称
* @return
*/
@Override
public boolean isTopic(String topicName) {
return getTopicList().contains(topicName);
}
/**
* 获取topic list
*
* @return
*/
@Override
public List<String> getTopicList() {
Collection<TopicListing> topicListings = null;
ArrayList<String> list = new ArrayList<>();
try {
//获取topiclist,1分钟超时后报异常
topicListings = adminClient.listTopics().listings().get(1, TimeUnit.MINUTES);
topicListings.forEach(topicListing -> {
list.add(topicListing.name());
});
} catch (Exception e) {
e.printStackTrace();
}
return list;
}
/**
* 获取partition信息
*
* @param consumer kafka消费者
* @param topicName
* @return
*/
@Override
public List<PartitionInfo> getPartitionNum(KafkaConsumer consumer, Object topicName) {
return (List<PartitionInfo>) consumer.listTopics().get(topicName);
}
/**
* 删除topic
*
* @param topicName
* @return
*/
@Override
public boolean deleteTopic(String topicName) {
boolean flag = false;
try {
adminClient.deleteTopics(Collections.singleton(topicName))
.all()
.get();
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
public static void main(String[] args) {
KafkaUtils utils = new KafkaUtils(MyPropertiesFilePaths.KAFKA_DEV);
// List<String> topicLists = utils.getTopicList();
// topicLists.forEach(topiclist->{
// System.out.println(topiclist);
// });
// System.out.println(utils.isTopic("test"));
}
我们这里边有使用到自定义Properties,有需要可以参考。
标签:return,param,kafka,topic,import,工具,topicName 来源: https://blog.csdn.net/weixin_43947279/article/details/115081544