消息中间件搬迁
作者:互联网
消息队列
原理
消息存储
Pull方式
Push方式
高可用
异常重试
生产者端的消息失败
消费者端的消失失败
顺序性
对比
消息队列
作用:异步,解藕,峰值处理,可恢复,顺序,扩展性
RocketMq源码部分主要可以分为
rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,
通信框架就封装在rocketmq-remoting模块中
原理
Broker
/ | \
/ | \
Produce - Namesrv - Consume
1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心
(Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据)
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
RocketMQ消息队列集群中的几个角色
NameServer:在MQ集群中做的是做命名服务,更新和路由发现 broker服务;
NameServer相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与队列信息等。
NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步,每个Broker与集群内所有的Broker保持长连接。
Broker-Master:broker 消息主机服务器;
Broker-Slave:broker 消息从机服务器;
Producer:消息生产者;
Consumer:消息消费者。
其中,RocketMQ集群的一部分通信如下:
Broker( 集群最核心模块,主要负责Topic消息存储、管理和分发等功能)
Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息;
消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。
如果没有则更新路由信息会从NameServer上重新拉取;
消息生产者Producer根据所获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
RocketMQ的Broker集群部署模式还挺多的,比如单Master模式、多Master模式、多Master多Slave模式(异步复制)、多Master多Slave模式(同步双写)等。明确个概念,RocketMQ Slave不可以写,可以读,类似于MySQL的主从机制。
对于Producer端RocketMQ采用了轮询的方式保证了负载均衡,
Consumer端通常采用cluster集群方式消费消息,我们可以自己定义消息在消息端的分配方式。
send 方法会设置一个默认的 timeout, 3 秒。
默认使用 SYNC 模式,另外有 Async 和 OneWay 模式。
我们需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。
https://www.jianshu.com/p/a2e4e45709c4
消息存储
https://www.jianshu.com/p/b73fdd893f98
Pull和Push
对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费。
严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。
通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
Pull方式
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;
Push方式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
通过研究源码可知,RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端(这里,在RocketMQ的Broker端会有一个后台独立线程—ReputMessageService不停地构建ConsumeQueue/IndexFile数据,同时取出hold住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s)。
RocketMQ消息Pull的长轮询机制的关键在于Broker端的PullRequestHoldService和ReputMessageService两个后台线程。对于RocketMQ的长轮询(LongPolling)消费模式后面会专门详细介绍。
在本文前面已经提到过了,从严格意义上说,RocketMQ并没有实现真正的消息消费的Push模式,而是对Pull模式进行了一定的优化,一方面在Consumer端开启后台独立的线程—PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端。另外一方面,后台独立线程—rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。然后,再根据业务反馈是否成功消费来推动消费进度。
在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如1.2节内容所述,如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。
https://blog.csdn.net/hxyascx/article/details/83900779
https://www.cnblogs.com/chenjunjie12321/p/7922362.html
https://www.jianshu.com/p/f071d5069059?utm_source=oschina-app
高可用
1⃣️单Master模式:
无需多言,一旦单个broker重启或宕机,一切都结束了!很显然,线上不可以使用。
2⃣️多Master模式:
全是Master,没有Slave。当然,一个broker宕机了,应用是无影响的,缺点在于宕机的Master上未被消费的消息在Master没有恢复之前不可以订阅。
3⃣️多Master多Slave模式(异步复制):
多对Master-Slave,高可用!采用异步复制的方式,主备之间短暂延迟,MS级别。Master宕机,消费者可以从Slave上进行消费,不受影响,但是Master的宕机,会导致丢失掉极少量的消息。
4⃣️多Master多Slave模式(同步双写):
和上面的区别点在于采用的是同步方式,也就是在Master/Slave都写成功的前提下,向应用返回成功,可见不论是数据,还是服务都没有单点,都非常可靠!缺点在于同步的性能比异步稍低。
多Master多Slave的好处在于,即便集群中某个broker挂了,也可以继续消费,保证了实时性的高可用,但是并不是说某个master挂了,slave就可以升级master,开源版本的rocketmq是不可以的。也就是说,在这种情况下,slave只能提供读的功能,将失去消息负载的能力。
异常重试
消息失败重试机制:
消息失败,无非涉及到2端:从生产者端发往MQ的失败;消费者端从MQ消费消息的失败;
生产者端的消息失败
比如网络抖动导致生产者发送消息到MQ失败,可以设置重试次数,setRetryTimesWhenSendFailed(3);
Producer的send方法本身支持内部重试,重试逻辑如下:
1.至多重试 3 次。
2.如果发送失败,则轮转到下一个 Broker。
3.这个方法的总耗时时间不超过 sendMsgTimeout设置的值,默认10s。所以,如果本身向broker发送消息产生超时异常,就不会再做重试。
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做:
如果调用send同步方法发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达Broker。
消费者端的消失失败
分为2种情况,一个是timeout,一个是exception
timeout,比如由于网络原因导致消息压根就没有从MQ到消费者上,在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)
exception,消息正常的到了消费者,结果消费者发生异常,处理失败了。这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?
消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)
RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要在重试呢,因为重试解决不了问题了!
消费过程要做到幂等(即消费端去重)
1.将消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在
Db或Tair(全局KV存储)中存在,如果不存在则插入入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全句唯一标识符;
2.使用业务局面的状态机去重 。
顺序性
这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费
如何保证顺序
在MQ的模型中,顺序需要由3个阶段去保障:
消息被发送时保持顺序
消息被存储时保持和发送的顺序一致
消息被消费时保持和存储的顺序一致
>>> 顺序性,消费的消息发往同一个broker的同一个队列上!其次消费者端采用有序Listener即可。
注意在以前普通消费消息时设置的回调是MessageListenerConcurrently,而顺序消费的回调设置是MessageListenerOrderly。
RocketMQ底层是如何做到消息顺序消费的,看一看源码你就能大概了解到,至少来说,在多线程消费场景下,一个线程只去消费一个队列上的消息,那么自然就保证了消息消费的顺序性,同时也保证了多个线程之间的并发性。也就是说其实broker并不能完全保证消息的顺序消费,它仅仅能保证的消息的顺序发送而已!
https://blog.csdn.net/yzhou86/article/details/79156458
RabbitMQ
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
Kafka
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。
对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。
RocketMQ不保证消息的顺序性,
RocketMQ(消息可靠传输)重试机制:网络问题:生产者会无限次重试 / 会是无限次的 业务逻辑:延迟重试
处理消息重复:保持幂等,消费者通过日志记录(消息ID),大事务 = 小事务 + 异步
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
集群消费 & 广播消费
RocketMQ的消费方式有2种,在默认情况下,就是集群消费,也就是上面提及的消息的负载均衡消费。另一种消费模式,是广播消费。广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。
默认的是使用集群消费
public class Consumer { public static void main(String[] args) throws MQClientException { //声明并初始化一个consumer //需要一个consumer group名字作为构造方法的参数,这里为consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同样也要设置NameServer地址 consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876"); * consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置consumer所订阅的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*"); //设置一个Listener,主要进行消息的逻辑处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { String messageBody = new String(messageExt.getBody()); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date())+"消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容 } //返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,需要稍后重新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //调用start()方法启动consumer consumer.start(); System.out.println("Consumer Started."); } }
对比
>>> 消息队列和dubbo的差异:
在架构上,RPC和Message Queue的差异点是,Message Queue有一个中间结点Message Queue(broker),可以把消息存储。
ActiveMq / RabbitMq
【AMQP】高级消息队列协议,以字节为流
【JMS】相当于一个API
MQ选型
但凡选择就会受到主观和客观两个因素的影响。我们如何尽量客观的进行架构和框架选型,而避免先有结果而后找理由的文字游戏,下面我分享下我们做MQ选型的过程(这里不是说主观就是不好的,但作为工程师凡事做结构化和量化还是有必要的)。
3.1 关键需求
1) 集群支持:为了保证消息中间件的可靠性,需要提供完备的生产者、消费者、消息中间件集群方案;
2) 持久化的支持:为了避免消息丢失,需要支持消息保存到磁盘文件或其它格式存储;
3) 消息重试的支持:消息处理失败后的支持失败转存或重试,并提供消息至少投第一次或消息最多投递一次的配置;
4) 分布式事务的支持:为了保证业务的完整性,选择的中间件需要支持分布式;
5) 消息的按序消费:在有些场景下,需要消息的消费能够按照发送的同样顺序进行处理从而保证顺序执行;
6) 消息的延时支持:在2C业务处理或三方数据源对接中,会遇到消息延时投递要求,需要支持延投递;
7) 消息堆积和回溯功能:在消息中间件持久化保存大量消息时不会对性能有大的影响,支持消息查询、重发,或者按照时间点来重新消费消息,以应对某一段时间消息的重新消费场景。
3.2 其它需要考虑的因素
1) 产品与当前技术栈是否匹配,团队人员熟悉源代码更便于对消息中间件的原理理解和后续功能扩展;
2) 产品的使用广度特别是金融同业客户:同业因为业务同质化校对,场景需求相近,使用的人越多,说明关键场景支持较好,问题在之前暴露的越充分,当我们在使用时碰问题的时候,就比较容易找到对应的解决方案或解决人员;
3) 产品的高可用性:作为一个金融企业,需要服务的持续可用,作为提高企业弹性的基础消息平台,集群和高可用是一个必不可少的要求;
4) 产品的稳定性:产品可以持续、稳定的提供服务,不需要经常因为资源泄露或性能衰减等问题而重新启动。
5) 产品的活跃度:通过github统计数据能看出来这个产品是否经常有人维护,经常有人开发一些新的功能,经常fix一些bug。
3.3 选型要点及原则
l 搜寻满足关键需求的框架到候选清单;
l 从功能和非功能性需求等几个方面对候选框架进行筛选;
l 在帅选过程中要做好量化记录,避免先有倾向性的结果,后有筛选,这样选型就变成了一场数字游戏;
l 有时要换个角度思考,常用来做比较的可能就是最好的,如很多MQ框架都与Kafka做比较,那么Kafka有可能就是最通用的框架,如果做选型就要对其是否满足自己的需求做重点分析;
l 遵循第三眼美女原则,让理性引导感性;
l 适合的就是最好的,不要但纯追求高性能和功能全面。
标签:Pull,消费,搬迁,Broker,重试,消息,消息中间件,RocketMQ 来源: https://www.cnblogs.com/novalist/p/11621340.html