其他分享
首页 > 其他分享> > kafka(五):消费组信息提取

kafka(五):消费组信息提取

作者:互联网

文章目录


2021-09-17

说明

分享

环境

实现

maven

<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>2.3.0</version>  
</dependency>  

代码

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TimerTask;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kafka.customer.util.CollectionUtil;
import com.kafka.customer.util.ConfigUtil;
import com.kafka.customer.util.FileUtil;
import com.kafka.customer.util.TimeUtil;


public class KafkaLogTimer extends TimerTask {
	
	private final Logger log= LoggerFactory.getLogger(KafkaLogTimer.class);
	
	/**
	 * admin 客户端
	 */
	private AdminClient adminClient;
	
	/**
	 * consumer客户端
	 */
	private KafkaConsumer<String, String> consumer;
	
	/**
	 * 时间
	 */
	private long time;
	
	/**
	 * 输出文件名
	 */
	private String fileName;
	
	private String intervalChar="|";
	
	/**
	 * 执行状态 0  待执行  1 执行完毕
	 */
	private int state;
	
	
	@Override
	public void run() {
		String timeStr=TimeUtil.getFileTime();
		time=TimeUtil.getUtcMinuteTime();
		fileName=timeStr+ConfigUtil.txt;
		
		log.info("{} Start To Run",timeStr);
		state=0;
		
		KafkaLogThread kafkaLogThread=new KafkaLogThread();
		kafkaLogThread.start();
		long createTime=System.currentTimeMillis();
		
		while(true&&state==0) {
			//如果传入线程没有执行完
            if ((System.currentTimeMillis() - createTime) >= ConfigUtil.taskTime) {
                log.error("Time {} Deal Over Time Will To Stop",timeStr);
                kafkaLogThread.interrupt();
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("Wanting Thread Run Error");
            }
		}
		log.info("{}  End To Finish",timeStr);
	}
	
	/**
	 * 
	 * @author wangzonghui
	 * @date 2021-08-27 10:13:47
	 * @Description kafka信息获取线程
	 */
	class KafkaLogThread extends Thread{
		
		@Override
		public void run() {
			
			try {
				//连接
				init();
				//获取日志信息
				List<String> logdataList =createLog();
				//关闭连接
				close();
				
				//存储文件
				if(CollectionUtil.isNotEmptyCollection(logdataList)) {
					String outputFile=ConfigUtil.outputPath+fileName;
					FileUtil.createDataFile(logdataList, outputFile);
				}else {
					log.info("{} Get Log Is Null",TimeUtil.getTime());
				}

			} catch (Exception e) {
				log.info(fileName+" Create Error",e);
			}
			
			
			state=1;
		}
		
		/**
		 * 初始化环境变量
		 */
		public void init() {
			Properties props = new Properties();
		    props.put("bootstrap.servers", "localhost:9092");
		    
		    //kerbores安全认证
		    if(ConfigUtil.kerberos==0){
		    	props.put("security.protocol", "SASL_PLAINTEXT");
		    	props.put("sasl.mechanism", "GSSAPI");
		    	props.put("sasl.kerberos.service.name", "kafka");
		    }
		    
		    adminClient= AdminClient.create(props);

		    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		    consumer = new KafkaConsumer<>(props);
		}
		
		/**
		 * 生成日志主类
		 */
		public List<String> createLog() {
			//消费组信息获取
			ListConsumerGroupsResult list=adminClient.listConsumerGroups();
			KafkaFuture<Collection<ConsumerGroupListing>> data=list.all();
			List<String> logdataList=new ArrayList<>();
			
			try {
				if(data!=null&&data.get()!=null&&CollectionUtil.isNotEmptyCollection(data.get())) {
					Collection<ConsumerGroupListing>resultlist=data.get();
					Map<TopicPartition, OffsetAndMetadata> resultOffset;
					String context;
					StringBuffer buf;
					
					TopicPartition topicPartition;
					long endOffset,consumerOffset,lag;
					
					for(ConsumerGroupListing consumerGroupListing:resultlist) {
						
						resultOffset=adminClient.listConsumerGroupOffsets(consumerGroupListing.groupId()).partitionsToOffsetAndMetadata().get();
						if(resultOffset!=null&&CollectionUtil.isNotEmptyMap(resultOffset)) {
							for(Entry<TopicPartition, OffsetAndMetadata> mapItem :resultOffset.entrySet()) {
								context="";
								buf=new StringBuffer();
								buf.append(time).append(intervalChar);
								
								topicPartition=mapItem.getKey();
								endOffset=getLogEndOffset(topicPartition);
								consumerOffset=mapItem.getValue().offset();
								lag=endOffset-consumerOffset;
								
								buf.append(consumerGroupListing.groupId()).append(intervalChar).append(topicPartition.topic()).append(intervalChar).append(endOffset).append(intervalChar).append(consumerOffset).append(intervalChar).append(lag);
								context=buf.toString();

								logdataList.add(context);
							}
						}else {
							log.error("Get Consumer Group {} Of Null",consumerGroupListing.groupId());
						}
						
					}
				}else {
					log.info("No Found Consumer Gourp");
				}
			} catch (Exception e) {
				log.error("Get Kafka Consumer Error:"+e.toString(),e);
			}
			return logdataList;
		}
		
		/**
		 * 获取topic endoffset
		 * @param topicPartition
		 * @return
		 */
		private long getLogEndOffset(TopicPartition topicPartition) {
			
			consumer.assign(Arrays.asList(topicPartition));
			consumer.seekToEnd(Arrays.asList(topicPartition));
			return   consumer.position(topicPartition);
		}
		
		/**
		 * 关闭连接
		 */
		public void close() {
			if(adminClient!=null) {
				adminClient.close();
			}
			
			if(consumer!=null) {
				consumer.close();
			}
		}
	}
}

总结

标签:消费,信息提取,kafka,util,org,apache,import,append
来源: https://blog.csdn.net/qq_22973811/article/details/120346821