其他分享
首页 > 其他分享> > RabbitMQ 学习笔记 - 中级

RabbitMQ 学习笔记 - 中级

作者:互联网

RabbiMQ

In Spring

在 SpringBoot 中, 交换机、队列、Binding 等配置信息都是通过配置类中对应的 bean 来操作的, 生产者只关注向交换机发送消息, 消费者只关注从队列中取出消息

RabbitMQ-Spring

持久化

交换机持久化

在新建交换机时, 设置 durable=true, 可以保证交换机的配置信息不丢失 (如: Bindings)

队列持久化

在新建队列时, 设置 durable=true, 只能保证本身的元数据不丢失

消息持久化

在发送消息时, 设置消息属性的 delivery_mode=2 (1是非持久化), 保证消息在队列中不丢失

注意

  1. 将 exchange、queue 和 message 都进行持久化操作后,也不能保证消息一定不会丢失,消息存入RabbitMQ 之后,还需要一段时间才能存入硬盘。RabbitMQ 并不会为每条消息都进行同步存盘,如果在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,就会丢失。对于这种情况,可以引入 RabiitMQ 镜像队列机制。
  2. 是否开启持久化机制需要在消息可靠性和系统吞吐量之间做权衡。

消息可靠性

消息丢失的情况

  1. 生产者的原因: 生产者未将消息成功送达 Exchange, 如: 消息在半路丢了 或 指定的交换机根本不存在

  2. Exchange 的原因: 发送到交换机的消息未成功投递到指定队列中, 如: 路由不到目标队列

  3. Broker 的原因: RabbitMQ 服务宕机了, 导致 Broker 中的数据丢失

  4. 消费者的原因: 消费者取出消息后在执行业务之前宕机了

保证可靠性的方案

  1. 开启发布确认机制, ConfirmCallback() 方法会回调交换机是否成功接收了消息, ReturnsCallback() 会回调交换机无法投递消息到队列的情况。
  2. 开启持久化机制 (持久化机制也不能保证消息 100% 不丢失, 因为 RabbitMQ 并不能立即将每条信息都写入磁盘)
  3. 关闭消息应答自动应答, 改为手动应答, 并在业务处理失败时设置合适的处理机制

消息手动应答

如果消费者在断开连接时存在未应答的消息, 则未应答的消息会重新返回到队列中。

使用消息应答处理失败的消费

  1. 业务处理失败后, Nack 消息且不将消息重回队列 (否则会出现消息的循环重试), 同时记录日志、通知人工干预处理失败消息。
  2. 业务处理失败后尝试重新执行业务, 并在重试次数达到指定值后, Nack 销毁消息且通知人工干预处理失败消息。
  3. 队列负载均衡模式下, 在 Redis 或消息头中写入失败次数, 然后 Nack 并让消息重回队列, 让其它消费者尝试消费。
  4. 使用 SpringBoot 中的 retry 配置, 默认策略为: RejectAndDontRequeueRecoverer

消息的重复消费

出现消息重复消费的情况

  1. 生产时产生重复消息

    在同步确认机制下, Broker 发送给生产者的确认信息由于网络波动而丢失, 导致生产者没有收到确认, 而生产者重新发送一遍这条消息, 就出现了消息的重复生产。

  2. 消费时消息重复分发

    在手动应答模式下, 消费者发送给 Broker 的 Ack 信息由于网络波动而丢失, 导致未应答消息重回队列, 从而被再次投递给消费者, 出现消息的重复消费。

保证消息的幂等性

消息的重复消费是无法避免的, 因此我们要尽可能的保证消息在出现重复消费的情况下的结果是呈幂等性的。

  1. 对于向 mysql 插入数据, 消费者可以先查该消息是否已存在, 如果存在则不再插入, 直接应答, 但是这样效率较低
  2. 同样对于 mysql 插入数据的情况, 可以给每个消息消息设置一个唯一的 ID, 并在 mysql 中将对应字段设置为主键或唯一约束
  3. 让每个消息携带一个全局唯一的 ID, 并在消费之前查询之间的消费记录 (如 Redis、BloomFilter), 对于已消费的消息直接应答即可 (布隆过滤器会出现误判情况, 一旦出现误判则消息就会丢失, 但是误判的概率很低)

高级队列

Arguments 参数

队列的附加属性,有如下可选项

TTL

Time To Live

死信队列

DLX (Dead Letter Exchange)

相关配置:

  1. x-dead-letter-exchange
  2. x-dead-letter-routing-key

概念

死信的来源

  1. 队列达到最大长度而被迫删除的消息
  2. 队列中 TTL 到期的消息
  3. 被消费者拒绝应答且没有重回队列的消息

延迟队列

概念

延迟队列是基于 TTL + 死信队列来实现的一种队列, 为一个上游队列设置 TTL, 那么信息过期后就会转到对应的死信队列中, 从而实现了下游队列中的消息延迟一定时间后到达队列

出现的问题

  1. 消息丢失如何处理 --> 生产者发布确认机制、交换机推送失败回调、消费者手动应答消息
  2. 消息重复消费如何处理 --> 使用全局唯一的消息 ID, 记录消费列表, 并在消费消息之前查看历史记录
  3. 如何保证消息的有序性 --> 为队列只设置一个消费者, 多消费者采用其他机制, 如: 为消息设置递增编号, 并全局注册已消费编号, 然后按顺序执行 (感觉还不如使用单一消费者)

推荐文章

  1. RabbitMQ--客户端常用API详解
  2. AMQP 0-9-1 快速参考指南

标签:中级,队列,应答,笔记,死信,交换机,RabbitMQ,设置,消息
来源: https://www.cnblogs.com/xtyuns/p/15578835.html