编程语言
首页 > 编程语言> > Python RabbitMQ基础知识

Python RabbitMQ基础知识

作者:互联网

rabbitmq

  1. 概念

    消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,在很多生产环境中需要控制并发量的场景下用到。消息队列可为这些分布式应用程序提供通信和协调。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActivateMQ、Kafka等。

    • Broker:简单来说就是消息队列服务器实体
    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
    • Queue:消息队列载体,每个消息都会被投入到一个或多个队列
    • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    • Routing Key:路由关键字,exchange根据这个关键字进行消息投递
    • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
    • producer:消息生产者,就是投递消息的程序
    • consumer:消息消费者,就是接受消息的程序
    • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

image

image

  1. 应用场景

    • 应用解耦:多用用间通过消息队列对同一个消息处理,避免调用接口失败导致整个过程失败。
    • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
    • 限流削峰:双十一,618等抢购活动。
    • 消息驱动系统:业务体量不断扩大,采用微服务设计思想,分布式的部署方式。

    成本:

    • 应用复杂度:需要对消息队列进行管理
    • 暂时不一致性

    使用消息队列满足条件:

    • 生产者不需要立刻从消费者出获得反馈
    • 容许短暂的不一致性
    • 起到解耦,提速,广播,削峰等作用

    消息队列的使用场景是怎样的?

Docker安装

rabbitmq是在portain平台上安装rabbitmq-management。

  1. 访问dockerhub,搜索rabbitmq,点击进去rabbitmq获取rabbitmq-management的dockerfile链接。

image


FROM rabbitmq:3.9

RUN set eux; \
	rabbitmq-plugins enable --offline rabbitmq_management; \
# make sure the metrics collector is re-enabled (disabled in the base image for Prometheus-style metrics by default)
	rm -f /etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf; \
# grab "rabbitmqadmin" from inside the "rabbitmq_management-X.Y.Z" plugin folder
# see https://github.com/docker-library/rabbitmq/issues/207
	cp /plugins/rabbitmq_management-*/priv/www/cli/rabbitmqadmin /usr/local/bin/rabbitmqadmin; \
	[ -s /usr/local/bin/rabbitmqadmin ]; \
	chmod +x /usr/local/bin/rabbitmqadmin; \
	apt-get update; \
	apt-get install -y --no-install-recommends python3; \
	rm -rf /var/lib/apt/lists/*; \
	rabbitmqadmin --version

EXPOSE 15671 15672

  1. 在Protainer上创建镜像。

image

image

  1. 运行镜像,主要是将容器的15672(management端口)和5672(amqp端口映射出来)。

  2. 最后访问http://[服务器ip]:15672即可到rabbitmq管理界面,输入默认账号密码guest/guest即可访问。

  3. 关于rabbitmq管理界面
    image

    image

    image

    image

点击任意一个Exchange:

image

image

点击任意一个queue:

image

用一个邮局的例子来说明各自的作用。首先邮局表示一个队列,邮筒就是一个channel。channel的作用是建立会话任务。每个地方建立一个邮局很“贵”类似每次建立TCP/IP链接非常“贵”且耗时,用户也无需每次跑到邮局,只需要把信放在邮筒即可。邮局收到用户的信后,根据信封上的地址(exchange)投递给收信方。

python实现

  1. 简单消费者生产者模式

import pika
import json

credentials = pika.PlainCredentials(user, user)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')

for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print('send:'+message)
connection.close()


credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print('receive:'+body.decode())

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

把消费的代码注释掉,我们在rabbitmq management看看


import pika
import json

credentials = pika.PlainCredentials(user, user)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = host,port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')

for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
    print('send:'+message)
connection.close()


# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.3.130',port = 5672,virtual_host = '/',credentials = credentials))
# channel = connection.channel()
# # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
# channel.queue_declare(queue = 'python-test', durable = False)
# # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
# def callback(ch, method, properties, body):
#     ch.basic_ack(delivery_tag = method.delivery_tag)
#     print('receive:'+body.decode())

# # 告诉rabbitmq,用callback来接收消息
# channel.basic_consume('python-test',callback)
# # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
# channel.start_consuming()

image

image

Ack mode选择Automatic Ack模式后,消费消息自动确认

image

  1. 工作模式

image

一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。有两种消息分发机制,轮询分发和公平分发:轮询分发的特点是将消息轮流发送给每个消费者,在实际情况中,多个消费者,难免有的处理得快,有的处理得慢,如果都要等到一个消费者处理完,才把消息发送给下一个消费者,效率就大大降低了。而公平分发的特点是,只要有消费者处理完,就会把消息发送给目前空闲的消费者,这样就提高消费效率了。


# producer

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

# consumer

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 公平分发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

image

image

  1. Publish/Subscribe模式

image

publish.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

Subscribe.py


import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

  1. 路由模式

image

producer.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

consumer.py


import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

  1. Topic模式

image

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
  1. 路由模式

image


import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()


import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

参考

RabbbitMq官网

消息队列rabbitmq

python实现rabbitmq六种模式

pika-Api文档

标签:pika,exchange,Python,RabbitMQ,基础知识,queue,队列,connection,channel
来源: https://blog.csdn.net/u012655441/article/details/120373963