其他分享
首页 > 其他分享> > kafka工具类

kafka工具类

作者:互联网

背景

在做数据开发时,经常预见要查看kafka 元数据的一些信息,比如有多少个topic、某个topic中有多少分区、创建topic、删除topic等等。

代码

废话不多,直接撸。
我们的maven版本为:

 <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.13</artifactId>
      <version>2.7.0</version>
  </dependency>

首先我们可以定义一个接口

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;

public interface KafkaClientImp {

    /**
     * 创建kafka
     *
     * @param topicName   topic名称
     * @param partition   分区数 建议小于12个
     * @param replication 副本数量
     * @return 是否创建成功
     */
    boolean createTopic(String topicName, Integer partition, short replication);

    /**
     * topic 是否存在
     *
     * @param topicName topic名称
     * @return true 存在  false不存在
     */
    boolean isTopic(String topicName);

    /**
     * 获取topic列表
     *
     * @return
     */
    List<String> getTopicList();

    /**
     * 获取topic 的分区数
     *
     * @param consumer kafka消费者
     * @return
     */
    List<PartitionInfo> getPartitionNum(KafkaConsumer<String, String> consumer, Object topicName);


    /**
     * 删除topic
     *
     * @param topicName
     * @return
     */
    boolean deleteTopic(String topicName);


}

然后我们写一个KafkaUtils类来实现接口。

import com.meituan.kafka.metadata.KafkaClientImp;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

import java.util.*;
import java.util.concurrent.TimeUnit;


/**
 * kafka工具类
 */
public class KafkaUtils implements KafkaClientImp {

    private AdminClient adminClient;

    public KafkaUtils(String filePath) {
        HashMap<String, Object> map = new HashMap<>();
        MyProperties myProperties = new MyProperties();
        String bootstrapServers = myProperties.GetValueByKey(filePath, "bootstrapServers");
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        adminClient = KafkaAdminClient.create(map);
    }

    /**
     * 创建topic
     *
     * @param topicName   topic名称
     * @param partition   分区数 建议小于12个
     * @param replication 副本数量
     * @return
     */
    @Override
    public boolean createTopic(String topicName, Integer partition, short replication) {

        boolean flag = false;
        try {
            NewTopic newTopic = new NewTopic(topicName, /*topic名称,不建议过长*/
                    partition,/*分区数量*/
                    replication/*副本数量*/
            );
            adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000))
                    .all()
                    .get();
            flag = true;
        } catch (Exception e) {
            if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) {
                System.out.println("topic is exist !! " + e.getMessage());
            } else {
                e.printStackTrace();
            }
        }
        return flag;
    }

    /**
     * 判断topic是否存在
     *
     * @param topicName topic名称
     * @return
     */
    @Override
    public boolean isTopic(String topicName) {
        return getTopicList().contains(topicName);
    }

    /**
     * 获取topic list
     *
     * @return
     */
    @Override
    public List<String> getTopicList() {
        Collection<TopicListing> topicListings = null;
        ArrayList<String> list = new ArrayList<>();
        try {
            //获取topiclist,1分钟超时后报异常
            topicListings = adminClient.listTopics().listings().get(1, TimeUnit.MINUTES);
            topicListings.forEach(topicListing -> {
                list.add(topicListing.name());
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return list;
    }

    /**
     * 获取partition信息
     *
     * @param consumer  kafka消费者
     * @param topicName
     * @return
     */
    @Override
    public List<PartitionInfo> getPartitionNum(KafkaConsumer consumer, Object topicName) {

        return (List<PartitionInfo>) consumer.listTopics().get(topicName);

    }

    /**
     * 删除topic
     *
     * @param topicName
     * @return
     */
    @Override
    public boolean deleteTopic(String topicName) {
        boolean flag = false;

        try {
            adminClient.deleteTopics(Collections.singleton(topicName))
                    .all()
                    .get();
            flag = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static void main(String[] args) {
        KafkaUtils utils = new KafkaUtils(MyPropertiesFilePaths.KAFKA_DEV);
        // List<String> topicLists = utils.getTopicList();
        // topicLists.forEach(topiclist->{
        //     System.out.println(topiclist);
        // });
        // System.out.println(utils.isTopic("test"));
    }

我们这里边有使用到自定义Properties,有需要可以参考。

标签:return,param,kafka,topic,import,工具,topicName
来源: https://blog.csdn.net/weixin_43947279/article/details/115081544