定时任务实现(RabbitMQ 延迟队列)
作者:互联网
前言
其实rabbit 没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live(TTL)消息超时机制;2、Dead Letter Exchanges(DLX)死信队列。
先理解一个概念:
rabbit 中一个消息是有死亡状态的,它会被发送到一个指定的队列中,这个队列是一个普通的队列,根据他的功能,我们叫他死信队列。
当发生下面的情况时,消息会被发送到死信队列:
- 消息被消费者接收,并且标记了reject或者nack,拒绝或者未消费成功。
- 队列设定了消息存活时间,超过存活时间未被消费,会自动发送到死信队列。
- 队列满了,再被分发到队列的消息,会被发送到死信队列。
延迟队列原理
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ消息的过期时间有两种方法设置。
- 通过队列(Queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)
- 对消息单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为死信(dead letter)
死信队列
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
- x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange
- x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送
队列中出现死信(dead letter)的情况有:
- 消息或者队列的TTL过期。(延迟队列利用的特性)
- 队列达到最大长度
- 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
综合上面两个特性,将队列设置TTL规则,队列TTL过期后消息会变成死信,然后利用DLX特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。
如图:
理解了概念就知道是使用rabbit 的死信队列 做定时任务了。具体实现如下:
生产者
import pika import json import time credentials = pika.PlainCredentials('admin', 'admin') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials)) channel = connection.channel() # 声明消息队列,消息将在这个队列传递,如不存在,则创建 queue_name = "delay_queue_a" exchange = 'delay_exchange_a' routing_key = 'delay_routing_key_a' dead_letter_exchange = 'dead_exchange_a' # 'amq.direct'#'dead_exchange_a' dead_letter_routing_key = 'dead_letter_routing_key_a' # 'dead_queue_a'#'dead_letter_routing_key_a' arguments = { "x-message-ttl": 5000, 'x-dead-letter-exchange': dead_letter_exchange, 'x-dead-letter-routing-key': dead_letter_routing_key } channel.confirm_delivery() channel.exchange_declare(exchange=exchange, durable=True, exchange_type='direct') result = channel.queue_declare(queue=queue_name, durable=False, arguments=arguments) channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) for i in range(10): message = json.dumps({'OrderId': i}) # 向队列插入数值 routing_key是队列名 channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message, properties=pika.BasicProperties(delivery_mode=2)) print(message) time.sleep(1.5) connection.close()
消费者:
import pika import json credentials = pika.PlainCredentials('admin', 'admin') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials)) channel = connection.channel() # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列 queue_name = "dead_queue_a" # dead_letter_exchange = 'amq.direct'#'dead_exchange_a' dead_letter_exchange = 'dead_exchange_a' dead_letter_routing_key = 'dead_letter_routing_key_a' # queue_name = dead_letter_routing_key channel.exchange_declare(exchange=dead_letter_exchange, durable=False, exchange_type='direct') result = channel.queue_declare(queue=queue_name, durable=False) channel.queue_bind(exchange=dead_letter_exchange, queue=queue_name, routing_key=dead_letter_routing_key) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): data = json.loads(body.decode()) print(data) ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉rabbitmq,用callback来接收消息 channel.basic_consume(queue_name, callback, auto_ack=False) print('开始监听') try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() connection.close() print('close')
看了上面还是模糊: 点击前往原著
标签:routing,key,exchange,队列,dead,RabbitMQ,letter,延迟 来源: https://www.cnblogs.com/TF511/p/16355811.html