其他分享
首页 > 其他分享> > 如何保证mq不丢消息

如何保证mq不丢消息

作者:互联网

1.消息的发送流程

一条消息从生产到被消费,将会经历3个阶段
image

2.生产阶段

Prodducer 通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer,所以生产者只有接收到返回的确认响应,就代表消息在生产阶段未丢失

同步发送伪代码

DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
try {
    SendResult sendResult = mqProducer.send(msg);
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

send 方法是同步操作,只要这个方法不抛出异常,就代表消息已经发送成功
消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能返回不同的状态

异步发送伪代码

DefaultMQProducer mqProducer = new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.setRetryTimesWhenSendFailed(5);
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

try {
    // 异步发送消息到,主线程不会被阻塞,立刻会返回
    mqProducer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息发送成功,
        }

        @Override
        public void onException(Throwable e) {
            // 消息发送失败,可以持久化这条数据,后续进行补偿处理
        }
    });
} catch (RemotingException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

异步发送,要重写回调方法,在回调方法中检查发送结果
不管同步还是异步,都会碰到网络问题导致发送失败的请求,针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试,设置方式如下

// 同步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 异步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);

3.存储阶段

消息到了Broker端,将会优先保存到内存里,然后立刻返回确认响应ack给生产者。随后Broker 定期批量的将一组消息从内存中异步刷到磁盘中
定期异步刷数据到盘的操作,减少了IO次数,可以有更好的性能,但是如果发生几起掉电,戎机的情况,消息还未及时刷到磁盘,就会出现丢失消息的情况。
如果要保证Broker端不丢消息,需要将消息的保存机制改为同步刷盘方式,来一个消息,刷一下到磁盘中,再返回响应。
master配置修改

flushDiskType = SYNC_FLUSH

当Broker未在同步时间内(默认5秒)完成刷盘,将会返回SendStatus.FLUSH_DISK_TIMEOUT状态给生产者

高可靠
broker通常采用集群部署,一主多从架构,为了保证消息不丢,消息还会复制到slave节点
默认情况下,消息写入master成功,就可以返回确认响应ack给生产者,接着消息异步复制到slave节点

此时若Master突然戎机不可恢复,那么还未恢复到slave的消息将会丢失
为了进一步提高消息可靠性,可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。
异步复制与同步复制的区别如下:
image

Broker master节点同步复制配置如下

brokerRole = SYNC_MASTER

如果slave 节点未在指定时间内同步返回响应,生产者将会受到SendStatus.FLUSH_SLAVE_TIMEOUT返回状态


如果 要严格保证消息不丢,broker需要如下配置:

## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole = SYNC_MASTER

## slave 节点配置
brokerRole = slave
flushDiskType = SYNC_FLUSH

生产者要配合,判断返回状态是否SendStatus.SEND_OK,若是其他状态,需要考虑补偿重试。上述配置会提高消息的可靠性,但是会降低性能,生产实践中需要综合选择。不是完全固化的配置

标签:同步,Broker,发送,保证,消息,mqProducer,mq,刷盘
来源: https://www.cnblogs.com/PythonOrg/p/14837074.html