编程语言
首页 > 编程语言> > python – 盒子里的Kafka:无法从主机发送消息

python – 盒子里的Kafka:无法从主机发送消息

作者:互联网

我创建了一个Vagrant / Ansible手册来构建单节点Kafka VM.

我们的想法是在原型设计时提供一些灵活性:如果我们想要一个快速的&脏Kafka消息队列我们可以简单地克隆[我的’kafka在一个盒’回购’,CD ..和流浪汉.

这是我到目前为止所做的:

Vagrantfile:

VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|

  config.vm.box = "hashicorp/precise64"

  config.vm.network "forwarded_port", guest:9092, host: 9092

  config.vm.provider "virtualbox" do |vb|
    vb.customize ["modifyvm", :id, "--memory", "2048"]
  end

  config.vm.provision "ansible" do |ansible|
    ansible.playbook = "kafkaPlaybook.yml"
  end

end

…和Ansible kafkaPlaybook.yml文件:

---
- hosts: all
  user: vagrant
  sudo: True

  tasks:

    - name: install linux packages
      action: apt update_cache=yes pkg={{item}} state=installed
      with_items:
        - vim
        - openjdk-7-jdk

    - name: make /usr/local/kafka directory
      shell: "mkdir /usr/local/kafka"

    - name: download kafka (the link is from an apache mirror)
      get_url: url=http://apache.spinellicreations.com/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz dest=/usr/local/kafka/kafka-0.8.1.1-src.tgz mode=0440

    - name: untar file
      shell: "tar -xvf /usr/local/kafka/kafka-0.8.1.1-src.tgz -C /usr/local/kafka"

    - name: build kafka with gradle
      shell: "cd /usr/local/kafka/kafka-0.8.1.1-src && ./gradlew jar"

当我流浪时,盒子得到配置.我能够流浪ssh并在本地执行基本的生产者/消费者测试,例如

cd /usr/local/kafka/kafka-0.8.1.1-src
bin/zookeeper-server-start.sh config/zookeeper.properties                               #start zookeeper
bin/kafka-server-start.sh config/server.properties                                      #start kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tests                #start a producer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  #start a consumer

当我在生产者窗口中键入消息时,它们出现在消费者窗口中.大.

我尝试使用kafka-python包从主机连接到kafka:

>>> from kafka import KafkaClient, SimpleProducer
>>> kafka = KafkaClient("127.0.0.1:9092", timeout=120)
>>> kafka.ensure_topic_exists('turkey')
No handlers could be found for logger "kafka"
>>> kafka.ensure_topic_exists('turkey')
>>> producer = SimpleProducer(kafka)
>>> producer.send_messages("turkey", "gobble gobble")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 261, in send_messages
    return super(SimpleProducer, self).send_messages(topic, partition, *msg)
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 188, in send_messages
timeout=self.ack_timeout)
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 312, in send_produce_request
    resps = self._send_broker_aware_request(payloads, encoder, decoder)
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 148, in _send_broker_aware_request
    conn = self._get_conn(broker.host, broker.port)
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 55, in _get_conn
    timeout=self.timeout
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 60, in __init__
self.reinit()
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 195, in reinit
self._raise_connection_error()
  File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 75, in _raise_connection_error
    raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
kafka.common.ConnectionError: Kafka @ precise64:9092 went away

kafka.ensure_topic_exists调用了两次.第一次运行时,它会返回一个警告然后创建主题,所以我可以看到Python正在通过端口9092与Kafka通信.但是,我无法将消息发送到队列.

你能看出我做错了什么吗?

解决方法:

advertised.host.name和advertised.port需要在config / server.properties中设置.我在剧本中添加了以下两行:

- name: uncomment and set advertised.host.name
  lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties 
              regexp='^#advertised.host.name=<hostname routable by clients>'
              insertafter='^#advertised.host.name=<hostname routable by clients>'
              line='advertised.host.name=localhost'
              state=present

- name: uncomment and set advertised.port line
  lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties 
              regexp='^#advertised.port=<port accessible by clients>'
              insertafter='^#advertised.port=<port accessible by clients>'
              line='advertised.port=9092'
              state=present

…现在可以配置单节点Kafka集群:

git clone https://github.com/alexwoolford/vagrantKafkaBox
cd vagrantKafkaBox
vagrant up

如果我再次开始这个,我可能会使用Wirbelsturm配置实验室Kafka.

标签:python,vagrant,apache-zookeeper,apache-kafka
来源: https://codeday.me/bug/20190528/1172679.html