如何监视Kafka消息的应用程序处理以进行负载测试
作者:互联网
有一个应用程序(不是我的)从Kafka读取消息,对它们进行一些处理,并将记录存储在数据库中.我已经用Java编写了一个程序,它以给定的速率将消息写入队列.现在,它通过在测试运行结束时查询数据库来确定性能的简单度量,以确保记录在=记录中.但是,我想扩展它以定期检查队列,以查看应用程序尚未处理的待处理消息数量,以查看它是否已备份.
我想我可以在Zookeeper中检查应用程序组ID的偏移量.我查看了Kafka documentation,但它只给出了基本的消费者示例,API文档充其量只是稀疏,所以我不确定如何查找这些信息.
我需要调用哪些API才能找到应用程序当前所在队列中的位置,以及该位置后面队列中有多少消息?
我正在使用Kafka 2.10-0.8.2.1和一个Zookeeper实例以及三个Kafka实例,负载测试器正在使用0.8.2.1 Java API.有问题的主题有三个分区(每个Kafka服务器上有一个分区),但是为了测试的目的,只有一个消费者.
解决方法:
我建议查看Kafka中已经提供的工具(如果需要直接调用API,代码在src中可用).特别是,
$bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1
会告诉你偏移和滞后:
consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)
参考文献:
标签:java,apache-kafka,apache-zookeeper 来源: https://codeday.me/bug/20190824/1709110.html