现网环境快速测试kafka接收消息--python操作kafka
作者:互联网
1、问题:目前现网环境中使用到的kafka服务器是别人的,我们无法登入,现在想查看某一个topic的消费信息
当前服务器没有安装kafka应用程序,所以也无法使用kafka-console-consumer.sh来连接,写一个java程序来上传包在运行过于复杂,可以考虑使用python脚本来连接测试消费数据
首先 ,默认linux环境自带了python,我们只需要安装一个python的kafka的第三方库即可
# 上传kafka-1.3.5.tar.gz
[root@k8s-fengfan opt]# tar -zxvf kafka-1.3.5.tar.gz
[root@k8s-fengfan opt]# cd kafka-1.3.5
[root@k8s-fengfan kafka-1.3.5]# python setup.py install
# 安装完毕后可以使用python连接kafka
2、编写python脚本
kafka-consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('epic-choppp-receive', bootstrap_servers=['k8s-fengfan:9092'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
- epic-chopper-receive:订阅的topic
- k8s-fengfan:9092:连接kafka的地址
3、启动kafka、运行python
- ./kafka-console-producer.sh --broker-list k8s-fengfan:9092 --topic epic-choppp-receive
- python consumer.py
这样就可以简单测试kafka消费结果了
TIP:有时候会启动失败
需要指定一下api_version
from kafka import KafkaConsumer
consumer = KafkaConsumer('epic-choppp-receive', bootstrap_servers=['k8s-fengfan:9092'],api_version=(0,10,1))
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
标签:现网,--,kafka,python,msg,k8s,consumer,fengfan 来源: https://blog.csdn.net/qdboi/article/details/111645969