Kafka学习(十四) api讲解篇(转载)
作者:互联网
python 发送kafka
python 发送kafka大体有三种方式
1 发送并忘记(不关注是否正常到达,不对返回结果做处理)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['ip:9092'], 6 key_serializer=lambda k: pickle.dumps(k), 7 value_serializer=lambda v: pickle.dumps(v)) 8 9 start_time = time.time() 10 for i in range(0, 10000): 11 print('------{}---------'.format(i)) 12 future = producer.send('test_topic', key='num', value=i, partition=0) 13 14 # 将缓冲区的全部消息push到broker当中 15 producer.flush() 16 producer.close() 17 18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)
2 同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 from kafka.errors import kafka_errors 5 6 producer = KafkaProducer( 7 bootstrap_servers=['ip:9092'], 8 key_serializer=lambda k: pickle.dumps(k), 9 value_serializer=lambda v: pickle.dumps(v) 10 ) 11 12 start_time = time.time() 13 for i in range(0, 10000): 14 print('------{}---------'.format(i)) 15 future = producer.send(topic="test_topic", key="num", value=i) 16 # 同步阻塞,通过调用get()方法进而保证一定程序是有序的. 17 try: 18 record_metadata = future.get(timeout=10) 19 # print(record_metadata.topic) 20 # print(record_metadata.partition) 21 # print(record_metadata.offset) 22 except kafka_errors as e: 23 print(str(e)) 24 25 end_time = time.time() 26 time_counts = end_time - start_time 27 print(time_counts)
3 异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer( 6 bootstrap_servers=['ip:9092'], 7 key_serializer=lambda k: pickle.dumps(k), 8 value_serializer=lambda v: pickle.dumps(v) 9 ) 10 11 12 def on_send_success(*args, **kwargs): 13 """ 14 发送成功的回调函数 15 :param args: 16 :param kwargs: 17 :return: 18 """ 19 return args 20 21 22 def on_send_error(*args, **kwargs): 23 """ 24 发送失败的回调函数 25 :param args: 26 :param kwargs: 27 :return: 28 """ 29 30 return args 31 32 33 start_time = time.time() 34 for i in range(0, 10000): 35 print('------{}---------'.format(i)) 36 # 如果成功,传进record_metadata,如果失败,传进Exception. 37 producer.send( 38 topic="test_topic", key="num", value=i 39 ).add_callback(on_send_success).add_errback(on_send_error) 40 41 producer.flush() 42 producer.close() 43 44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts)
除此之外,还能发送压缩数据流
def gzip_compress(msg_str): try: buf = StringIO.StringIO() with gzip.GzipFile(mode='wb', fileobj=buf) as f: f.write(msg_str) return buf.getvalue() except BaseException, e: print ("Gzip压缩错误" + e) def gzip_uncompress(c_data): try: buf = StringIO.StringIO(c_data) with gzip.GzipFile(mode='rb', fileobj=buf) as f: return f.read() except BaseException, e: print ("Gzip解压错误" + e) def send_kafka(topic_name, msg, key=None): if key is not None: producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"], key_serializer=gzip_compress, value_serializer=gzip_compress) r = producer.send(topic_name, value=msg, key=key) else: producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"], value_serializer=gzip_compress) r = producer.send(topic_name, value=msg) # producer.flush(timeout=5) producer.close(timeout=5) return r
标签:producer,讲解,send,Kafka,api,key,time,print,import 来源: https://www.cnblogs.com/yinging/p/16512870.html