消息队列:RabbitMQ安装和快速入门
作者:互联网
文章目录
一、消息队列
在计算机中,消息是一个程序发送给另一个程序的数据,而队列(一种数据结构)就是一个容器,存放在里面的东西,都是先放进来的先被拿出去。所以,消息队列就是在消息的传输过程中暂时保存消息的容器。使用队列,能够避免在收、发消息需要同步进行的弊端,可以让发送方直接将消息放如队列,避免因为等待接收方而阻塞。
1.1 作用
-
应用解耦:
把一个大系统拆分为多个小系统,互相之间用消息队列来做数据的交互或函数的调用。
-
流量削峰:
在流量特别大的应用场景内,比如秒杀活动。这种情况下,服务端很容易崩溃。但是我直接拒绝用户,对用户来说是一种极为不好的用户体验。所以,我们可以控制处理请求的速度,将暂时处理不了的请求放入消息队列,让用户稍等一会儿,这比直接拒绝好很多。
-
消息分发:
就是将数据发送给需要接收数据的其他主机、程序等
-
异步处理:
比如,在用户成功购买某一件商品后,同时向他发送订单的手机短信和电子邮件,然后跳转到成功页面:
- 没有消息队列:先将订单数据交给短信模块发送手机短信,然后交给邮件模块发送电子邮件,完成后跳转到成功页面。
- 使用消息队列:直接跳转到成功页面,至于手机短信和电子邮件,可以把订单数据放入消息队列中,短信模块和邮件模块在后台异步读取数据并发送。
1.2 主流消息队列比较
Kafka | RocketMQ | RabbitMQ | |
---|---|---|---|
单机吞吐量 | 十万级 | 十万级 | 万级 |
消息延迟 | 毫秒级 | 毫秒级 | 微秒级 |
可用性 | 非常高(分布式) | 非常高(分布式) | 高(主从) |
消息丢失 | 理论上不会丢失 | 理论上不会丢失 | 低 |
社区活跃度 | 高 | 中 | 高 |
二、RabbitMQ的安装
2.1 安装
2.1.1 Docker 方式
# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
# for RabbitMQ 3.8,
# 3.8.x support timeline: https://www.rabbitmq.com/versions.html
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
2.1.2 原生方式(Ubuntu 20.04)
-
将下面的代码,保存到
rabbitMQ_install.sh
文件中:#!/usr/bin/sh sudo apt-get install curl gnupg apt-transport-https -y ## Team RabbitMQ's main signing key curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null ## Cloudsmith: modern Erlang repository curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null ## Cloudsmith: RabbitMQ repository curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null ## Add apt repositories maintained by Team RabbitMQ sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF ## Provides modern Erlang/OTP releases ## deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu bionic main ## Provides RabbitMQ ## deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu bionic main EOF ## Update package indices sudo apt-get update -y ## Install Erlang packages sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl ## Install rabbitmq-server and its dependencies sudo apt-get install rabbitmq-server -y --fix-missing
-
给文件添加执行权限:
sudo chmod +x ./rabbitMQ_install.sh
-
执行脚本:
sudo bash ./rabbitMQ_install.sh
2.2 管理插件的用法
-
启动 RabbitMQ服务:
sudo systemctl start rabbitmq-server
-
创建 RabbitMQ用户:
sudo rabbitmqctl add_user '用户名' '密码'
其实,RabbitMQ自带了一个来宾用户,用户名和密码都是
guest
。 -
授予管理员权限:
sudo rabbitmqctl set_user_tags 用户名 administrator
-
向虚拟主机中的用户授予所有权限:
sudo rabbitmqctl set_permissions -p / 用户名 '.*' '.*' '.*'
-
启用管理插件:
sudo rabbitmq-plugins enable rabbitmq_management
-
访问管理页面:http://localhost:15672/,输入前面创建的用户名和密码即可。
三、RabbitMQ快速入门
3.1 名词介绍
-
生产(Producing):即发送消息,发送消息的程序被称为“生产者( producer)”。
-
消费(Consuming):即接收消息,接收消息的程序被称为“消费者(consumer)”
-
队列(Queue):就是存放消息的地方。队列只受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。
可以有多个生产者将消息发送到一个队列,也可以有多个消费者尝试从一个队列接收数据。
注意:生产者、消费者、消息队列可以放在不同的机器上;生产者和消费者也可以是同一个程序。
3.2 Hello World!
接下来,我们用 python 代码编写两个小程序,生产者发送消息(将消息放入队列),消费者接收消息(从队列取出消息),消息内容为“Hello World!”。这是一个最精简的 RabbitMQ 的使用过程。
-
安装 RabbitMQ 官方推荐的 python 客户端:
pip install pika
-
发送消息,新建 send.py 文件:
#!/usr/bin/env python import pika # 连接到本地的消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列,队列名称为 hello channel.queue_declare(queue='hello') # 发送消息 channel.basic_publish(exchange='', routing_key='hello', # 队列名称 body='Hello World!') # 消息内容 print(" [x] 发送 'Hello World!'") # 关闭连接 connection.close() # 会自动刷新网络缓存区,确保消息被发送
-
接收消息,新建 receive.py 文件:
#!/usr/bin/env python import pika, sys, os def main(): # 连接到本地的消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 创建队列,队列名称为 hello channel.queue_declare(queue='hello') # 回调函数,每当接收到消息时被调用 def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body) channel.basic_consume(queue='hello', # 指定接受消息的队列 auto_ack=True, # 是否自动调用回调函数 on_message_callback=callback) # 指定回调函数 print(' [*] 等待消息. 按下 CTRL+C 退出') # 进入一个循环,等待消息 channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
注意:消费者和生产者都进行了同一个队列的创建,目的是无论哪一方先被启动,都能确保队列存在。
-
启动消费者:
python receive.py # [*] 等待消息. 按下 CTRL+C 退出 # [x] 接收到 b'Hello World!'
-
启动生产者:
python send.py # [x] 发送 'Hello World!'
3.3 任务队列(work queue)
任务队列(也叫工作队列)是存储了一些资源密集型任务的队列,这些任务被封装成一个个的消息。目的是为了避免资源密集型任务对当前进程的阻塞,在后台中让专门的任务处理进程(worker)从任务队列中取出任务(就是消息,这里叫任务更加贴切)去执行。而且,当有多个任务处理进程一起工作时,便于任务在它们之间共享。
-
修改 send.py 中的代码,保存为 new_task.py ,要修改的部分如下:
import sys # 要处理的任务 message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) # 改为任务
-
修改 receive.py 中的代码,保存为 worker.py:
import time def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) # 消息体中有几个“.”就睡几秒钟,假装任务处理很耗时 time.sleep(body.count(b'.')) print(" [x] 完成")
3.3.1 循环调度
使用任务队列的优点之一是能够轻松地扩大规模。如果我们的任务被大量积压,来不及处理,我们可以很轻松地增加更多的任务处理进程(以下都将简称为 worker)。
-
启动两个终端,运行2个 worker.py:
python worker.py # [*] 等待消息. 按下 CTRL+C 退出
-
再启动一个终端,运行 new_task.py:
python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
-
看看 worker 收到了什么:
# 第一个 worker: [x] 接收到 'First message.' [x] 完成 [x] 接收到 'Third message...' [x] 完成 [x] 接收到 'Fifth message.....' [x] 完成 # 第二个 worker: [x] 接收到 'Second message..' [x] 完成 [x] 接收到 'Fourth message....' [x] 完成
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个 worker。一般来说,每个 worker 都会收到相同数量的任务。这种分发任务的方式称为轮询。
3.3.2 消息确认
在上面的代码中,如果一个 worker 在执行任务的过程中被终止了,那么任务会丢失。为了避免任务丢失,RabbitMQ 提供了消息确认机制:
- 如果 worker 回复一个 ack(acknowledgement),那么 RabbitMQ 就认为该任务被成功处理,然后从任务队列删除该任务。
- 如果一个 worker 没有回复 ack 就终止运行了,那么 RabbitMQ 就认为该任务执行失败,会将该任务重新排队,分配给其他运行中的 worker。
消息确认有一个超时时间,默认为 30分钟。它能帮助我们检测异常的 worker。
默认情况下,手动消息确认是打开的。在前面的例子中,我们通过auto_ack=True
关闭了它们。
-
继续修改代码,移除关闭消息确认的参数:
def callback(ch, method, properties, body): print(" [x] 接收到 %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] 完成") ch.basic_ack(delivery_tag = method.delivery_tag) # 发送 ack channel.basic_consume(queue='hello', on_message_callback=callback)
使用这段代码后,即使你用 CTRL+C 终止一个正在处理任务的 worker,也不会有任何损失。
3.3.3 消息持久化
我们已经学会了如何确保即使 worker 意外终止,任务也不会丢失。但是如果RabbitMQ 服务器停止,我们的任务仍然会丢失。要确保任务不会丢失,需要做两件事:将队列和任务都标记为持久。
-
标记队列为持久,以便 RabbitMQ 重启后队列能够存活:
# 因为 hello 队列已经存在,所以我们换了个新队列 task_queue channel.queue_declare(queue='task_queue', durable=True)
注意!持久化标记要在生产者和消费者上都标记上。
-
标记任务为持久:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE ))
将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接收到消息并且还没有保存消息时,仍然会有一个很短的空档。
此外,RabbitMQ 不会对每条消息做
fsync(2)
处理——它可能只是被保存在缓存中,而不是真正写入磁盘。持久性保证不是强的,但对于我们的简单任务队列来说已经足够了。如果你需要一个更强的保证,那么你可以使用 publisher confirm。
3.3.4 公平调度
RabbitMQ 默认均匀地分配任务,即使有的任务特别耗时,使得分配到该任务地 worker 任务已经堆积如山,RabbitMQ 还是在均匀分配。
这是因为 RabbitMQ 在任务进入队列时只进行分配,不查看 worker 未确认消息的数量,盲目地将第 n 个任务发送给第 n 个 worker。不过,我们可以通过设置,改变这种行为。
-
在 worker 中配置公平调度:
channel.basic_qos(prefetch_count=1)
上面地数字是1,也就是说在 worker 处理并确认前一个任务完成之前,不要向它分配新任务,而是将任务发送给下一个没有工作中的 worker。
如果所有的 worker 都很忙,任务队列就会排满。你需要注意这一点,并尽可能添加更多的工作人员,或者使用消息 TTL。
3.4 发布/订阅(Publish/Subscribe)
发布者(即生产者)发送一条消息,每一个订阅者(即消费者)都能接收到,这种模式称为“发布/订阅”。
3.4.1 交换器(Exchanges)
之前我们说:生产者发送的消息被保存到队列中。这只是为了好理解而这么说的。实际上,生产者从不直接向队列发送任何消息,而是发送给交换器(exchange)。由交换器决定消息下一步去往何处,具体的规则由交换器类型定义。
交换类型有 direct、topic、header 和 fanout。创建一个 fanout 类型的交换器,取名为logs
:
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
fanout 交换器十分简单,它只是单纯地把消息发送给它所知道的队列。
将消息发送到创建好的交换器:
channel.basic_publish(exchange='logs', # 通过名字指定交换器
routing_key='',
body=message)
3.4.2 临时队列
之前我们使用的都是命名的队列,像hello、task_queue。如果我们创建队列时不传入名称会怎样呢?答案是 RabbitMQ 会创建一个临时队列,该队列的名称会像这样:“mq.gen-JzTY20BRgKO-HjmUJj0wLg”。
我们还可以传入一个exclusive=True
实现独占效果:当消费者关闭连接后,该队列也会随之消失。
创建一个独占的临时队列:
result = channel.queue_declare(queue='', exclusive=True)
3.4.3 绑定
在接收者中绑定交换器和队列,也就是订阅:
channel.queue_bind(exchange='logs',
queue=result.method.queue)
之后,logs 交换器将向我们的临时队列添加消息。只要接收者都绑定同一个交换器和队列,就都能收到消息。
我们还可以通过以下命令查看绑定信息(前提是接收者运行中):
sudo rabbitmqctl list_bindings
3.5 路由(Routing)
目前,我们的发布/订阅模型中,订阅者会毫无选择地接收发布者的所有消息。不过我们可以在绑定时,通过routing_key
参数进行过滤,让订阅者选择想要的消息接收。
routing_key
的含义取决于交换器的类型,之前使用的 fanout 类型会直接忽视这个参数。所以,需要使用其他类型的交换器。比如,Direct 交换器。
3.5.1 direct 交换器
它背后的算法很简单:
- 先看
basic_publish()
的routing_key
是否和queue_bind()
的routing_key
匹配; - 匹配就放入队列,不匹配就去匹配其他队列;
- 没有匹配的就丢弃消息。
# 发布消息
channel.basic_publish(exchange='direct_logs',
routing_key='black',
body=message)
# 绑定
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
RabbitMQ 允许使用同一个routing_key
绑定多个队列。交换器会将消息发送给多个匹配的队列。
3.5.2 topic 交换器
topic 交换器的routing_key
必须是用.
分割的多个词语,如orange.rabbit.lazy
,最长为255个字节。发布时的routing_key
为可以按照.
分割。如果二者之间匹配,则将消息发送给队列,如果不匹配则丢弃。这和 direct 交换器是一样的。但topic 交换器真正的用处在于两个特殊符号:
*
:代表匹配任意一个单词。比如,*.orange.*
可以匹配到中间是orange
的单词数量为3个的任意routing_key
。#
:代表匹配任意多个单词。比如,rabbit.#
可以匹配到rabbit
开头的任意routing_key
,且单词数量在合法范围内没有限制。
标签:交换器,入门,队列,worker,RabbitMQ,queue,消息 来源: https://blog.csdn.net/qq_39330486/article/details/122769088