其他分享
首页 > 其他分享> > RocketMQ从基础到应用(延迟消息队列)

RocketMQ从基础到应用(延迟消息队列)

作者:互联网

文章目录


RocketMQ

官方文档:https://rocketmq.apache.org/docs/quick-start/

  1. 解耦 中间件AB

  2. 异步 下单→>成功验证短信、邮件、仓储调度

  3. 消息驱动/事件驱动型

1.开启rocketmq

  1. 添加环境变量
ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"
  1. 在rocketmq的bin目录下开启cmd命令行

输入一下两个命令

mqnamesrv.cmd

启动第一个命令后被阻塞,重新开一个命令行继续输入

mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

2 应用

  1. 引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>
  1. 添加配置
rocketmq:
  topic: PlaceOrder
  namesrv-addr: 127.0.0.1:9876
  producer:
    producer-group: SleeveProducerGroup
  consumer:
    consumer-group: SleeveConsumerGroup

3 延迟消息队列

预定消息与普通消息的不同之处在于,它们直到指定的时间之后才会被传递。

生产者

@Component
public class ProducerSchedule {

    @Value("${rocketmq.producer.producer-group}")
    private String producerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;
    // rocketmq 的生产者
    private DefaultMQProducer defaultMQProducer;

//    执行顺序: A Construct--> B @Autowired --> @PostConstruct
    @PostConstruct
    public void setDefaultMQProducer() {
        if (defaultMQProducer == null) {
            //在类实例化后,创建defaultMQProducer对象
            this.defaultMQProducer = new DefaultMQProducer(producerGroup);
            // 设置rockemq的地址
            defaultMQProducer.setNamesrvAddr(namesrvAddr);
        }
        try {
            // 开启生成者
            defaultMQProducer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    public String send(String topic, String body) throws Exception {
        Message message = new Message(topic, body.getBytes());
        // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        // 设置延时级别
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult sendResult = defaultMQProducer.send(message);

        System.out.println(sendResult.getMsgId());
        System.out.println(sendResult.getSendStatus());
        return sendResult.getMsgId();
    }
}

消费者


/**
 * 实现CommandLineRunner接口的方法 该类由SpringBoot调用
 * 在程序初始化时,只执行一次
 */
@Component
public class ConsumerSchedule implements CommandLineRunner {

    @Value("${rocketmq.consumer.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    @Value("${rocketmq.topic}")
    private String topic;

    @Autowired
    private OrderCancelService orderCancelService;
    @Autowired
    private CouponBackService couponBackService;

    public void getMessage() throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置rockemq的地址
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 监听test主题下的所有消息
        consumer.subscribe(topic, "*");
        // 并发消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    String orderMessage = new String(msg.getBody());
                    System.out.println(orderMessage);
                    if (StringUtils.isEmpty(orderMessage)) break;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    @Override
    public void run(String... args) throws Exception {
        this.getMessage();
    }
}

4 RocketMQ消费位置

public enum ConsumeFromWhere {
    /**
     * 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    /**
     * 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_FIRST_OFFSET,
    /**
     * 一个新的订阅组第一次启动从指定时间点开始消费<br>
     * 后续再启动接着上次消费的进度开始消费<br>
     * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
     */
    CONSUME_FROM_TIMESTAMP,
}

4 rocketmq的有序消费模式和并发消费模式的区别

rocketmq消费者注册监听有两种模式,有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。

MessageListenerOrderly正确消费返回ConsumeOrderlyStatus.SUCCESS,

稍后消费返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

MessageListenerConcurrently正确消费返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER

标签:消费,String,队列,RocketMQ,CONSUME,consumer,public,rocketmq,延迟
来源: https://blog.csdn.net/m0_44966138/article/details/117534688