编程语言
首页 > 编程语言> > 我可以从Python调用Bluemix消息中心服务吗?

我可以从Python调用Bluemix消息中心服务吗?

作者:互联网

kafka-python客户端支持Kafka 0.9,但显然不包括新的身份验证和加密功能,因此我猜测它只能与开放式服务器一起使用(与以前的版本一样).无论如何,甚至Java客户端都需要一个特殊的消息中心登录模块来连接(或者从示例中可以看出),这表明除非有类似的模块可用于Python,否则任何东西都将无法工作.

我的特定情况是,我想使用同样由Bluemix托管的Jupyter笔记本中的消息中心服务(Apache Spark服务).

解决方法:

我能够使用kafka-python库进行连接:

$pip install --user kafka-python

然后 …

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

############################################
# Service credentials from Bluemix UI:
############################################
bootstrap_servers =   # kafka_brokers_sasl
sasl_plain_username = # user
sasl_plain_password = # password
############################################

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version=(0,10))

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

从Bluemix spark到jupyter笔记本,这对我来说都是有效的,但是请注意,这种方法没有使用spark.该代码仅在驱动程序主机上运行.

标签:ibm-cloud,python,apache-kafka,message-hub
来源: https://codeday.me/bug/20191012/1902910.html