kafka记录上次偏移量
作者:互联网
#!/usr/bin/env python # coding=utf-8 from kafka import * from kafka import KafkaConsumer import datetime,time import json def get_kafka_reviews(bootstrap_servers,topics): # print type(self.bootstrap_servers) consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers], group_id='yyjk01', auto_offset_reset='latest', enable_auto_commit=True) consumer.subscribe(topics=(topics)) #订阅要消费的主题 # print consumer.topics() # print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量 review_list =[] for message in consumer: print message # str_time = datetime.datetime.fromtimestamp(message.timestamp / 1000) # print message.timestamp # print type(message.timestamp) # #print message.topic ,message.timestamp,message.value # # # print message.topic, str_time, message.value # print type(message.value) # dict1 = json.loads(message.value) # print dict1 # print type(dict1) # print '-------------------' # for key in dict1: # # print str_time,key,dict1[key] # print '-------------------' # #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value) # #review_list.append(message.value) #return review_list print get_kafka_reviews('192.168.137.2:9092','test_topic')
标签:dict1,value,kafka,topic,偏移量,key,print,message,上次 来源: https://blog.csdn.net/zhaoyangjian724/article/details/120565522