rocketmq消息积压监控java代码实现
作者:互联网
最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量
1、启动时装配监控客户端的bean
@Component
public class MQAdminExtConfig {
private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);
@Value("${rocketmq.name-server}")
private String nameServer;
public static DefaultMQAdminExt defaultMQAdminExt;
/**
* 启动监控客户端
*/
@PostConstruct
public void initMqAdminExtConfig(){
//初始化一个生产者,用于初始化参数
log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);
try {
DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");
producer.setNamesrvAddr(nameServer);
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
try {
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr(nameServer);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
defaultMQAdminExt.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* @param consumerGroup 消费者组
* @param topic topic
* @return 当前topic的积压量
*/
private static long getBackLogMsg(String consumerGroup,String topic){
long diff=0;
log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);
try {
ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);
List<MessageQueue> mqList = new LinkedList();
mqList.addAll(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);
for(MessageQueue queue :mqList){
if(topic.equals(queue.getTopic())){
OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);
log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());
log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());
diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
}
}
} catch (Exception e) {
//当消费者未消费时此除会报错
diff=0;
log.error("get offset error -----------------{}",e);
}
return diff;
}
这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来
上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是
关于%RETRY%开头的topic,
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢
标签:积压,log,consumerGroup,offsetWrapper,topic,defaultMQAdminExt,java,nameServer,rocke 来源: https://blog.csdn.net/qq_41700030/article/details/120946785