消息对列基础
作者:互联网
消息对列基础
文章目录
- 消息对列基础
- 一.剖析高并发电商系统中订单系统的难点
- 二.消息队列原理以及落地
一.剖析高并发电商系统中订单系统的难点
1.订单系统功能概览
支付成功:
首先,用户购物车页面,选择商品,生成预订单页面,在预订单页面选择物流,地址,等信息,最后点击生成订单,后台去锁定库存,并生成真实订单;
生成订单后跳转支付页面,接入第三方支付系统,支付成功,返回成功状态给支付系统;
支付成功,减少库存(库存系统),发送优惠券给用户(营销系统),增加用户积分(积分系统),修改订单状态(订单系统),发送短信支付成功(短信系统),选择离目的地最近的公司仓库配送货物(仓储系统),通知第三方物流公司去公司仓库取件(物流系统);
**订单未支付:**未支付订单设置后台线程去扫描未支付状态订单,超过时间(一般24小时)取消订单,修改订单为“已关闭“状态,并释放预占库存;
**退款:**用户申请退款,商家确认后,回调订单系统,增加库存(库存系统),减积分(积分系统),收回优惠券(营销系统),修改订单状态(订单系统),通知第三方物流公司拦截商品(未发货取消)(物流系统),发送短信退款成功(短信系统)
秒杀模块:流量很大,QPS(系统每秒核心及非核心功能请求总和)较高,数据库压力巨大
大数据分析系统:采集用户行为信息,足迹,浏览时间占比,商品卖出信息,某个时间段APP使用频率,然后根据大数据统计和分析,精准营销商品;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZdaAQPTP-1620458138632)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302212808539.png)]
我们把非订单系统房间称为非核心业务,有仓储系统,物流系统,短信平台,营销系统,数据采集,支付系统,
核心业务:生成订单,库存系统,大促销模块
系统压力主要两方面:
- 日益增长的订单数据量,数据库压力巨大
- 大促销模块,流量峰值太高,每秒几万的订单数量
2.问题1:下单支付成功后,非核心业务繁杂,线性执行耗时,用户等待时间过长,怎么办?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nw9yaYAv-1620458138634)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302214543249.png)]
订单支付流程详图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Q1Z9B5N-1620458138635)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302215624882.png)]
假如非核心功能线性执行完需要2秒,那么在大促销,流量峰值高,,如果数据库负载也比较高,会导致数据库磁盘,IO,CPU都负载很高,那么SQL执行有可能下降,最终导致支付完成后,界面一直没有相应,等待几十秒才执行成功,用户体验极差;
解决:订单支付后,可以发送订单消息到MQ,由积分系统,营销系统,仓储系统,短信系统等订阅消费订单消息;
使用消息队列异步的特性,减少响应时间,降低订单系统压力;
3. 退款流程,如果退款不成功怎么办?
退款流程图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BCz2eYGU-1620458138637)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302215720974.png)]
退款流程需要做些什么?
第三方支付系统退款成功后:
- 给商品加库存
- 更新订单状态为“已完成”
- 减积分
- 收回优惠券,红包
- 发送Push(例如:短信,邮件)告诉你退款成功
- 通知仓储系统取消发货(收回用户邮寄商品)
如果退款失败,导致整个退款流程失败怎么办?
**方案:**当商品返回到仓库(商品到货退款),让第三方系统退款的时候同时发送消息到MQ让其他系统执行退款业务,扣减积分,收回优惠券等;
如果第三方支付系统退款失败,则由人工处理
4.有大量未支付订单堆积怎么办?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FyfYFVdS-1620458138638)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302220442880.png)]
订单生成,会锁定库存,未支付订单我们可以设置后台线程去扫描未支付状态订单,超过时间(一般24小时)取消订单,修改订单为“已关闭“状态,并释放预占库存;
但是如果数据库堆积了几十万,上百万未支付订单(例如:双11,618),不停的扫描这些订单吗?这回导致数据库压力更大,应用服务器压力更大,性能降低;
5.订单系统与非核心系统耦合,导致系统整体性能差,不稳定,怎么办?
引入第三方系统后,下单流程图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3z4iHZO5-1620458138639)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302221349251.png)]
假如本来积分系统的接口有5个参数,但是公司积分系统同事升级积分系统后,变为7个参数怎么办?是不是意味着订单系统要修改调用积分系统那部分代码?是不是其他非核心系统参数修改,订单系统也要修改调用接口的代码呢?这就是系统耦合过高。
如果按照上图调用链,订单系统调用仓储系统,通知发货,仓储系统又调用第三方物流,取商品,成功后,返回给仓储系统,然后仓促系统通知订单系统,修改订单状态;但是如果我们调用 第三方物流系统,它性能差,耗时过久呢? 是不是间接影响了我们系统的响应;
所以,第三方系统不能完全相信;
6.大数据团队要订单数据,会不会导致数据库压力过大,怎么解决?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Klq4DV1-1620458138640)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302222649266.png)]
大数据主要分析用户在APP的行为轨迹,商品数据,把分析结果做成各种报表
如果获取订单数据的途径是直接在 订单数据库进行分析,那么巨大数据的计算压力就会落在订单系统,以及订单数据库上,响应订单系统性能;
7. 秒杀活动,流量过大,系统压力过大怎么办?
1.限流方案一
消息队列削峰填谷,限流,将下订单请求放入消息队列,订单系统慢慢消费这些请求,减轻数据库压力
方案一:将下单请求放入消息队列秒杀服务慢慢消费这些请求;(缺点,增加了调用链,系统复杂性提高)
2.限流方案二
方案二:使用令牌桶限流;(前提:我们能预估出秒杀服务的处理能力)
原理:单位时间内只发放固定数量的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就保证单位时间内,能处理的请求不超过发放令牌的数量,起到了流量控制的作用。
实现:不需要破坏原有的调用链,只要网关在处理 APP 请求时增加一个获取令牌的逻辑。
用一个固定容量的消息队列queue存储令牌,令牌发生器按照预估的处理能力,匀速生产令牌并放入令牌队列(如果队列满了则丢弃令牌),网关收到请求从令牌队列获取令牌,获取不到秒杀失败,获取到令牌则继续调用后端秒杀服务;
8.订单系统疑难问题脑图汇总
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7uWygbPJ-1620458138643)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210302223301804.png)]
二.消息队列原理以及落地
1.消息队列能解决什么问题,以及带来什么问题?
1.优点:
异步:下单支付成功以及退款流程复杂 ,往往耗时过久,用户体验差,使用消息队列,支付成功直接返回支付结果,发送消息到消息队列,非核心功能系统订阅这些消息消费;系统异步调用,降低核心业务订单系统负担,减少响应时间,改善用户体验;
解耦:订单系统和下游系统耦合,当订单系统发生改变后发送一条消息到消息队列的一个主题 Order 中,所有下游系统都订阅主题 Order,这样每个下游系统都可以获得一份实时完整的订单数据。
流量削峰:双十一有大量订单时,可以将大量生成订单请求交给消息队列,订单系统在后台慢慢获取订单,不至于数据库直接被压垮
作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
连接流计算任务和数据;
用于将消息广播给大量接收者。
2.缺点:
引入消息队列带来的延迟问题;
增加了系统的复杂度;
可能产生数据不一致的问题。
2.业界主流MQ比较
选择中间件的考量维度:可靠性,性能,功能,可运维行,可拓展性,是否开源及社区活跃度。
-
RabbitMQ:
优点:轻量级,部署简单迅捷,“开箱即用”的消息队列,Erlang 语言编写,但是支持多种语言;
特色功能是producer和queue之间增加了Exchange,支持灵活的路由配置;
缺点:对消息堆积的支持并不好,每秒钟可以处理几万到十几万条消息,相对RocketMQ和Kafka性能低;
Erlang 语言小众,二次开发困难;
-
RocketMQ:
优点:Java 语言开发,容易二次开发;响应时延低,响应快;每秒钟大概能处理几十万条消息,性能好;丰富的功能;
缺点:支持语言较少,社区相对没有那么活跃;
-
Kafka:
优点:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
使用 Scala 和 Java 语言开发,设计上大量使用了批量和异步的思想,这种设计使得 Kafka 能做到超高的性能。
缺点:同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送Kafka 不太适合在线业务场景。**
总结:
如果说,消息队列并不是你将要构建系统的主角之一,对消息队列没有太高要求,用RabbitMQ;
如果说,消息队列主要场景是处理在线业务,要求低延时,高性能,那么使用RocketMQ;
如果说,需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列
3.原理
1.生产级消息中间件架构图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ailFKFH6-1620458138643)(C:\Users\tangj\AppData\Roaming\Typora\typora-user-images\image-20210303214838202.png)]
2. 高可用,高并发,高扩展,海量存储
高可用:Broker集群,slave挂了不影响,Master挂了基于Deldger技术自动选举Master;此外还有生产者消费者集群;
高并发:分布式,均摊请求;假如10万QPS写入,有5个Master Broker,那么 就把写入分给5个Master Broker,每个2万;
海量存储:分布式存储
高扩展:可随时增加或减少Slave Broker的数量
3.路由中心NameServer原理
**问题一:**broker把自己的地址信息,存储的topic信息注册到哪些NameServer?
所有Broker都会注册自己的信息到所有的NameServer,每个NameServer互不影响,互不协作,且注册有Broker的所有地址信息,以及topic信息;
**问题二:**系统如何从NameServer获取Broker信息?
生产者消费者 主动定时请求NameServer,获取broker信息
**问题三:**broker挂了,NameServer如何感知到?
心跳机制,broker每30s给所有NameServer发送自己的心跳(告诉自己还活着),而NameServer每隔10s检测当前时间与broker的上一次心跳时间间隔是否超过120s,超过120s则判定broker挂了,会更新Broker地址信息;
另外,Broker会以TCP长连接的形式与每一个NameServer发送心跳请求;
**问题四:**如果broker挂了,生产者,消费者还没来得及更新 可用broker地址信息怎么办?
容错机制,如果请求到挂掉的broker,就再次请求,重发消息,选择集群中其他的broker
**问题五:**如果NameServer集群突然挂了,怎么办?
消费者,生产者本地缓存有 NameServer集群挂之前的broker信息,而且有可能生产者消费者会定时保存broker信息到本地磁盘,所以消费者,生产者可以正常运行;
但是,如果新增broker,或者broker挂掉就会消费者,生产者就永远不会知道。
4.broker原理
问题一:Master Broker如何将信息同步给Slave Broker的?
Slave Broker 采用pull模式,定时向Master Broker发送同步请求,同步磁盘信息;
问题二:RocketMQ是不是读写分离的?
写入消息,是写入 Master Broker
读消息有可能从Master Broker,也有可能从Slave Broker;
消费者获取消息的时候 先发送请求到Master Broker,Master Broker会返回一批消息给消费者,同时在返回消息的同时根据Master Broker和Slave Broker的负载情况,告诉下一次获取消息时从哪个Broker获取;
例如:Master Broker负载很重,那么下次就可能从 Slave Broker获取消息;
Master Broker负载很重,但是Slave Broker同步较慢,只同步了9/10的消息,还有大量消息未同步,那么就只能从Master Broker获取消息;
问题三:Master Broker挂了,能直接从从Slave Broker选举一个作为Master Broker吗?
RocketMQ4.5之前不能自动切换主备,只能手动切换;
4.5之后引入了 Deldger,它基于Raft协议实现;可以让一个Master Broker有多个Slave Broker,且能实现主从自动切换(切换过程可能需要几秒钟);
5. MQ核心数据消息模型
1.topic基础概念
一:什么是topic?
它是一类数据的集合;比如订单消息就要建立一个订单topic,这里叫做topic_order_info,那么topic_order_info就是所有订单系统投递的 消息集合;如果仓储系统需要订单消息,订阅topic_order_info直接取就行了;
二:topic如何在broker中存储的?
假如,topic的消息有几千万,上十亿条,那么不能只存储在一组broker(通常一个集群)中,就要做分布式式存储了。
三:生产者怎么将消息发送给Broker的?消费者如何拉取消息的?
发送消息前,需要先有一个topic,然后根据Broker持有的topic信息,获取我们可以向哪些broker发送消息,负载均衡,选举出一个broker发送消息;
消费者也会订阅topic类型的消息,根据路由信息,获取可拉取消息的Broker服务器有哪些,然后拉取消息;
2.消息模型(队列和topic)
RabbitMQ队列模型(使用Exchage做路由转换,消息发送到Exchange,由它的路由规则,决定消息发送到那些对列)
同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据;
RocketMQ 和Kafka的消息模型:(发布/订阅模式)
1.topic中队列的意义
topic中队列是可配置的,多个队列相当于是负载均衡,一个topic中的每个队列消息不一样,多个队列组成了一份完整的消息(即一条消息只会在一个topic中的某个队列);
topic中不能保证消息有序性(如第一张图有可能先消费1,再消费0),队列中消息有序
2.消费者如何消费消息;
-
消费者组中同一时刻只能有一个消费者 消费一个队列的消息,当然可以多个消费者并行消费多个队列的消息;
-
消费者组用Consumer Offset消费偏移量来记录消费者组在队列中的位置;
-
多个消费者组之间互不影响,一个topic可以被多个消费者组同时消费;
-
一个消费者组消费完一个topic消息时,消息不会删除;只有当所有消费者组消费完消息时,消息才会被删除;
3、如何保证读取消息的有序性(一个消费组在一个主题下的多个队列并发消费就无法保证消息的顺序性)?
后台将消息发送到同一队列中(fe:按照订单ID或者用户ID,用一致性哈希算法,计算出队列ID,指定队列ID发 送,这样可以保证相同的订单/用户的消息总被发送到同一个队列上,就可以确保严格顺序了。)
4.消息发送确认是否会影响效率?
RocketMQ是批量确认消息的。
4.利用事务消息实现分布式事务
1.事务消息
场景:创建订单,然后发送订单创建的消息到消息队列,供下游系统去消费;创建订单和发送消息应该为原子操作,要么都成功,要么都失败;
方案一(本地事务内发送消息):
- 开启本地事务
- 创建订单,失败抛异常,结束流程,成功走第3步
- 根据订单结果选择是否发送消息,如果失败多次重试,成功,走第四部
- 提交本地事务
弊端:如果消息发送成功,本地事务提交失败呢?
方案二(先执行本地事务,后发送消息):
- 执行本地事务,如果失败,不发送消息,否则走第2步
- 发送消息,失败重试(一直失败导致数据不一致,为保证最终一致性可手动发送操作)或者回滚本地事务,成功流程结束
问题:
- 如果失败回滚本地事务,实际订单创建是涉及到很多系统的 交易(管单子的)、金融(管钱的)、库存(管商品库存的)这些东西回滚的代价是很大的;
- 如果执行本地事务成功,发送消息时断电了(导致数据不一致),怎么办?
方案三(使用消息队列的“事务消息”功能):
- 订单系统在消息队列开启“事务消息”,发送“半消息”到Broker
- 成功走第3步,否则重试
- 执行并提交本地事务
- 根据结果,失败回滚“事务消息”,成功提交“事务消息”
- 回滚或提交事务消息失败导致MQ未收到确认信息,RocketMQ利用事务反查机制,Broker定时Producer反查本地事务的状态(Kafka 的解决方案比较简单粗暴,直接抛出异常)
- 检查本地事务状态(当前案例)可以通过订单ID查询数据库是否有该订单
- 根据反查结果再次提交或回滚“事务消息”
注意:
- 半消息,也是有完整的消息数据,但是它堆消费者是不可见的,所以无法被消费
- 反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。
2.RocketMQ 的这种事务消息是不是完整地实现了事务的 ACID 四个特性?
A(原子性):本地事物的操作1,与往消息队列中生产消息的操作2,是两个分离的操作,不符合对原子性的定义;
C(一致性):由于操作消息队列属于异步操作,在数据一致性上,只能保证数据的最终一致性。若对于时效性要求很高的系统来说,事物消息不是数据一致的;但对于时效性要求不高的系统来说,他就是数据一致的。
I(隔离性):由于事物消息是分两步操作的,本地事物提交后,别的事物消息就已经可以看到提交的消息了。所以,不符合隔离性的定义;
D(持久性):RocketMq支持事物的反查机制,若“半消息”存储在磁盘中,那就支持持久性,即使事物消息提交后,发生服务突然宕机也不受影响;若存储在内存中,则无法保证持久性。
3.实现订单下单场景
- 首先通过producer.sendMessageInTransaction()方法发送一个半消息给MQ.
- 此时会在TransactionListener中的executeLocalTransaction()方法阻塞,然后在这个方法里面进行订单创建并提交本地事务,如果commit成功,则返回COMMIT状态,否则是ROLLBACK状态,如果正常返回COMMIT或者ROLLBACK的话,不会存在第3步的反查情况。
- 如果上面的本地事务提交成功以后,此节点突然断电,那么checkLocalTransaction()反查方法就会在某个时候被MQ调用,此方法会根据消息中的订单号去数据库确认订单是否存在,存在就返回COMMIT状态,否则是ROLLBACK状态。
5.如何确保消息不会丢失?
1.哪些地方会出现消息丢失?
- 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
- 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
2.怎么防止消息丢失?
消息队列通过最常用的请求确认机制,来保证消息的可靠传递。
-
在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。
以Kafka为例:
同步发送消息时注意捕获异常;
try { RecordMetadata metadata = producer.send(record).get(); System.out.println("消息发送成功。"); } catch (Throwable e) { System.out.println("消息发送失败!"); System.out.println(e); }
异步发送消息时注意在回调函数里判断结果;
producer.send(record, (metadata, exception) -> { if (metadata != null) { System.out.println("消息发送成功。"); } else { System.out.println("消息发送失败!"); System.out.println(exception); } });
-
存储阶段:
如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。
单个节点的Broker,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
集群的Broker 可以设置至少将消息发送到 2 个以上的节点,才发送确认;
-
消费阶段:
注意:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
3.如何检测消息是否丢失?
方案一:
一般大公司都有分布式链路追踪系统,会追踪到每一条信息,获取每一条消息的状态,判断消息是够丢失;
方案二(通过递增的序号检测消息是否丢失):
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。
-
单机模式(单个生产者和消费者):很多中小型公司没有时间精力去完善一个分布式链路系统,因此我们可以在Producer端发送消息的时候给消息家还是那个序号,序号递增,Consumer端消费消息的时候检测序号是否连续,来判断是否有消息丢失;
-
分布式情况下:
RocketMQ和Kafka,只能保证分区或者队列的有序性,不能保证Topic的有序性,所以,每个生产者要对应一个队列或者分区,要求在发送消息的时候保证同一个生产者的消息在同一个分区或队列,在Consumer端通过Producer分别检测消息有序性;
给每个生产者添加一个唯一可递增的标识,producer的IP+topic+递增数字,例如:127.17.0.0.1,orderInfoTopic,1
6.如何处理消费过程中的重复消息?
1.MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准
At most once:至多传递一次,消息被送达至多一次,即允许丢失消息,适合消息可靠性不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
At least once:至少传递一次,消息至少送达一次,不允许消息丢失,允许重复消费
Exactly once:恰好传递一次,不允许消息丢失,不允许重复消费
RocketMQ、RabbitMQ 和 Kafka都默认使用 At least once,所以都不能保证重复消费;
2.为什么消息队列默认At least once,而不是Exactly once?
-
如果是Exactly once,当消息消费成功但是ACK失败了,那么消息队列补偿机制仍然会再次消费该消息
-
如果是Exactly once,那么每次消费者消费消息时,都应该检测消息是否已经被消费,检测会降低消费数据,当数据量大时会导致消息堆积;检测消息是否被消费可以配合业务端去做,类似“事务消息”的回查机制;
3.如果消息一直重试失败,怎么办?
有的消息队列会有一个特殊的队列来保存这些总是消费失败的“坏消息”,然后继续消费之后的消息,避免坏消息卡死队列。这种坏消息一般不会是因为网络原因或者消费者死掉导致的,大多都是消息数据本身有问题,消费者的业务逻辑处理不了导致的。
4.幂等性解决消费重复消息
幂等(Idempotence) 本来是一个数学上的概念,它是这样定义的:
如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
在计算机领域中表示一个方法,操作或者服务,一次执行和多次执行的结果是一样的;
从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。
实现幂等性的方法:
例子:在分布式系统中,给账户A的余额加100元
-
利用数据库唯一约束实现幂等性
我们可以额外增加一个转账流水表,由转账单ID,账户ID,变更金额三个字段组成,其中账单ID和账户ID组成联主键,利用数据库主键唯一判断消息是否已经消费;
-
为更新的数据设置前置条件
思路:设置一个前置条件,如果满足前置条件,更新数据并更新数据库中前置条件,否则不更新
我们可以设置前置条件:“当账户A余额为500元,才给余额加100元”,发送消息时直接把账户A当前余额500元加入消息;
还有什么前置条件?
给数据加一个版本号version,每更新一次版本+1;
给数据加一个时间戳,每更新一次更新一下时间戳;
-
记录并检查
通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”。
实现方式:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
问题:
- 分布式环境下生成全局唯一ID会消耗性能;
- 检查消费状态+消费消息(执行消息内容)+更新消息状态 三者必须为原子性;
7.消息积压了怎么办?
1.优化性能避免消息挤压
对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。
-
发送端优化
- 发消息之前的业务逻辑耗时太多,优先优化业务代码
- 批量发送消息:适合要求吞吐量大,时延高的离线分析系统;例如发送数据是数据库数据,那么可以从数据库批量取数据,然后批量发送消息
- 提高并量:适合要求响应时延低的场景;如果消息发送端是 微服务,主要接收RPC请求处理在线业务,那么直接发送消息就可以了,因为所有 RPC 框架都是多线程支持多并发的,自然也就实现了并行发送消息。
-
消费端优化
一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
我们可以扩容队列或分区的数量和消费者的数量
注意:在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。因为每个分区上实际上只能支持单线程消费;
2.处理消息积压
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过(扩容消费端的实例数)来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。
1、消息队列内置监控,查看发送端发送消息与消费端消费消息的速度变化
2、查看日志是否有大量的消费错误
3、打印堆栈信息,查看消费线程卡点信息
8.使用消息队列限流API网关如何获取秒杀结果?
1.消息队列限流如何获取秒杀结果?
消息队列削峰填谷,限流,将下订单请求放入消息队列,订单系统慢慢消费这些请求,减轻数据库压力
方案:将下单请求放入消息队列秒杀服务慢慢消费这些请求;(缺点,增加了调用链,系统复杂性提高)
思考:当一个秒杀请求被消费,它的秒杀结果是如何返回给网关的?网关又是如何来给 APP 返回响应的呢?
//秒杀请求处理类
public class RequestHandler {
// ID生成器
@Inject
private IdGenerator idGenerator;
// 消息队列生产者
@Inject
private Producer producer;
// 保存秒杀结果的Map
@Inject
private Map<Long, Result> results;
// 保存mutex的Map
private Map<Long, Object> mutexes = new ConcurrentHashMap<>();
// 这个网关实例的ID
@Inject
private long myId;
@Inject
private long timeout;
// 在这里处理APP的秒杀请求
public Response onRequest(Request request) {
// 获取一个进程内唯一的UUID作为请求id
Long uuid = idGenerator.next();
try {
Message msg = composeMsg(request, uuid, myId);
// 生成一个mutex,用于等待和通知
Object mutex = new Object();
mutexes.put(uuid, mutex)
// 发消息
producer.send(msg);
// 等待后端处理
synchronized(mutex) {
mutex.wait(timeout);
}
// 查询秒杀结果
Result result = results.remove(uuid);
// 检查秒杀结果并返回响应
if(null != result && result.success()){
return Response.success();
}
} catch (Throwable ignored) {}
finally {
mutexes.remove(uuid);
}
// 返回秒杀失败
return Response.fail();
}
// 在这里处理后端服务返回的秒杀结果
public void onResult(Result result) {
Object mutex = mutexes.get(result.uuid());
if(null != mutex) { // 如果查询不到,说明已经超时了,丢弃result即可。
// 登记秒杀结果
results.put(result.uuid(), result);
// 唤醒处理APP请求的线程
synchronized(mutex) {
mutex.notify();
}
}
}
}
网关也是RPC服务端,和配置中心保持长连接,比如nacos,将其路由和配置信息定时的发送给配置中心,配置中心对其进行管理,定时的清除宕机的网关路由信息,如超过一定时间没有接收到网关的心跳包;
过程:
-
APP–发送秒杀请求到网关
-
在网关中将APP请求封装,增加网关ID,APP请求在网关内唯一请求ID,为了防止消息丢失(秒杀其实可以允许消息丢失,提升性能),同步发送是捕获异常,异步发送在回调函数内判读,发送失败返回秒杀失败,成功的话同步等待秒杀结果(这里要维护一个ConcurrentHashMap,添加一个键值对,唯一的请求ID为key,新建一个对象mutex为value)
-
秒杀服务端(消费者)从消息中获取网关ID和请求ID,通过网关ID在本地存储的路由信息表中查找网关实例信息(找不到可以从配置中心中再次更新路由表,再找不到直接放弃,网关等待timeout自然返回秒杀失败),获取网关实例,消费消息后,调用OnResult()返回秒杀结果
-
网关中接收秒杀结果,并响应给APP
2.单个队列如何实现并行消费?
消费端批量读取消息,多线程异步消费,消费5,6,7三个消息,消费成功,consumerOffset 消费标志位移动到8;
如果5消费失败,把5放入 另一个特殊的重试队列,consumerOffset 移到8 ,下次消费的时候优先消费重试队列中的消息;
标签:队列,系统,基础,Broker,发送,订单,消息,对列 来源: https://blog.csdn.net/qq_41201565/article/details/116529068