编程语言
首页 > 编程语言> > rocketmq消息积压监控java代码实现

rocketmq消息积压监控java代码实现

作者:互联网

最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量

1、启动时装配监控客户端的bean

@Component
public class MQAdminExtConfig {

    private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);

    @Value("${rocketmq.name-server}")
    private String nameServer;

    public static DefaultMQAdminExt  defaultMQAdminExt;


    /**
     * 启动监控客户端
     */
    @PostConstruct
    public void initMqAdminExtConfig(){
      //初始化一个生产者,用于初始化参数
        log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);
        try {
            DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");
            producer.setNamesrvAddr(nameServer);
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

         try {
             defaultMQAdminExt = new DefaultMQAdminExt();
             defaultMQAdminExt.setNamesrvAddr(nameServer);
             defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
             defaultMQAdminExt.start();
         } catch (Exception e) {
             e.printStackTrace();
         }

     }
}
  /**
     * 
     * @param consumerGroup    消费者组
     * @param topic           topic
     * @return      当前topic的积压量
     */
    private static long getBackLogMsg(String consumerGroup,String topic){
        long diff=0;
        log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);
        try {
            ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);
            List<MessageQueue> mqList = new LinkedList();
            mqList.addAll(consumeStats.getOffsetTable().keySet());
            Collections.sort(mqList);
            for(MessageQueue queue :mqList){
                    if(topic.equals(queue.getTopic())){
                        OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);
                        log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());
                        log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());
                         diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
                    }
            }
        } catch (Exception e) {
            //当消费者未消费时此除会报错
            diff=0;
            log.error("get offset error -----------------{}",e);
        }
        return diff;
    }

这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来
在这里插入图片描述
上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是
在这里插入图片描述
关于%RETRY%开头的topic
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢

标签:积压,log,consumerGroup,offsetWrapper,topic,defaultMQAdminExt,java,nameServer,rocke
来源: https://blog.csdn.net/qq_41700030/article/details/120946785