发布订阅
作者:互联网
Publish.py
import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.164.129',credentials=credentials))
channel = connection.channel()
# exchange
channel.exchange_declare(exchange='ex1',exchange_type='fanout')
# exchange
channel.basic_publish(exchange='ex1',
routing_key='',
body='landson')
connection.close()import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.164.129',credentials=credentials))
channel = connection.channel()
# exchange
channel.exchange_declare(exchange='ex1',exchange_type='fanout')
# exchange
channel.basic_publish(exchange='ex1',
routing_key='',
body='landson')
connection.close()
Subscribe.py
Subscribe.py这个脚本运行多次,就会生成多个空队列,都会和exchange='ex1'绑定,
然后exchange='ex1'把消息发送到所有和它绑定的队列里面,每一个消费者从关联的队列中拿到消息
import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.164.129',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='ex1',exchange_type='fanout')
# 随机生成一个队列,exclusive=True表示:一旦消费者的连接关闭,就要删除与之关联的队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
# print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='ex1',queue=queue_name)
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()
标签:订阅,pika,exchange,queue,发布,ex1,credentials,channel 来源: https://www.cnblogs.com/landson/p/15880210.html