其他分享
首页 > 其他分享> > MQ系列5:RocketMQ消息的发送模式

MQ系列5:RocketMQ消息的发送模式

作者:互联网

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析

在之前的篇章中,我们学习了RocketMQ的原理,以及RocketMQ中 命名服务 ServiceName 的运行流程,本篇从消息的生产、消费来理解一条消息的生命周期。

1 消息生产

在RocketMQ中,消息生产指的是 消息生产者往消息队列中写入数据的过程。因为业务场景的复杂性,RocketMQ架构设计了多种不同的发送策略。下面先讨论几种常见的场景:
-** 同步发送:** 整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。

1.1 消息发送步骤

一般情况下,我们发送消息,会使用默认的DefaultMQProducer类,经过以下几个步骤实现:

1.2 消息发生返回状态

消息发送之后,会相应的拿到回执。返回对象中的状态(SendResult.SendStatus)有4种,如下:

1.3 发送同步消息

实时同步消息是一种对可靠性、实时性要求比较高的场景,使用的也比较广泛,比如:

public class SyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testSyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","sync", "测试同步消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 6、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

image

1.4 发送异步消息

我们知道,异步主要用于那些对实时响应不敏感的业务,可以容忍一定时间的等待,只要能达到最终一致性即可。
有时候为了在流量高峰期进行削峰和分流,缓解压力,我们经常采用异步消息的发送模式。这种业务场景也很常见,比如:

public class AsyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testAsyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","async", "测试异步消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 6. 发送异步消息,SendCallback是处理异步回调的方法
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {  // 成功回调
                System.out.println("success: " + sendResult);
            }
            @Override
            public void onException(Throwable throwable) {  // 失败回调
                System.out.println("fail: " + throwable);
            }
        });
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

image

1.5 单向发送消息

OneWay的模式主要用在Care发送结果的场景,只要消息发送出去即完成任务,不需要对发送的状态、结果负责。常见的使用场景如

public class OneWayProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testOneWayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","oneway", "测试单向发送消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        producer.sendOneway(msg);
        // 6、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

image

1.6 发送延时消息

指定延迟的时间,在延迟时间到达之后再进行消息的发送。这种的使用场景也很多:

1.6.1 延时时间的使用限制

延时时间并不是随意指定的,Rocket源码中指定了18种等级,分别代表不同的时间时长,如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.6.2 发送延时消息具体实现

通过下面的代码,可以得到的结果是消费的时间点比信息记录的时间点延迟了1分钟,这是因为我们在send的时候做了delay。

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、创建生产者producer,并指定生产者组名为 testDelayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","delay", "测试延迟发送消息".getBytes("UTF-8"));
        // 5、设置延时等级4,对应1m,所以这个消息在一分钟之后发送
        msg.setDelayTimeLevel(4);
        // 6、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 7、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 8、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

image

1.7 发送批量消息

waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。

public class BatchProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        // 1、创建生产者producer,并指定生产者组名为 testBatchGroup
        DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息列表,并指定Topic,Tag和消息体
        List<Message> messages = new ArrayList<>();
        String topic = "testTopic";
        messages.add(new Message(topic, "batch", "测试批量发送消息 0".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "测试批量发送消息 1".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "测试批量发送消息 2".getBytes("UTF-8")));

        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(messages);
        // 6、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

1.8 如何提升消息生产的性能

消息的发送一般是经过 client发送、Broker服务器接收并处理、Broker服务器返回应答 三个步骤。
如果我们想要提高消息生产的效率,一般有如下方法:

根据阿里内部调优后的性能测试报告,消息的写入性能达到90万+的TPS,我们可以朝着这个指标进行优化。

2 总结

本篇介绍了RocketMQ 消息生产与发送的几种模式:

标签:producer,生产者,Broker,发送,MQ,消息,new,RocketMQ
来源: https://www.cnblogs.com/wzh2010/p/16629876.html