其他分享
首页 > 其他分享> > RocketMQ - body Compress

RocketMQ - body Compress

作者:互联网

在看书梳理代码的时候发现了一个变量:compressMsgBodyOverHowmuch (在DefaultMQProducer.java中)字面意思就是:消息体超过该值则启用压缩,
默认4K。


以前看过一点压缩算法的一些东西,想看看rocket mq中是如何进行压缩的。
在DefaultMQProducer中搜索发现没有使用该变量的地方,遂去DefaultMQProducerImpl中看看。


如下 图1

image.png

发现了一个 tryToCompressMessage 这个方法,代码不多。


这里就去那个静态方法里面看看如何压缩的:
如下图2
image.png
好吧,这里就仅仅用到了java库里面的zip算法去压缩的。但是这里奇怪的是为什么在334行捕获了IOException却不处理呢?打个日志也好啊。。。它楼上的解压方法都打了日志的
如下图3
image.png


好了,不纠结了回到 图1 中,继续搜一下看看是哪里调用了这个 tryToCompressMessage,发现 sendKernelImpl方法里面调用了。
如下图4
image.png
sendKernelImpl 是client客户端真正开始做发送消息操作的,所以在这里调用也不奇怪。


如图4所示,这里定义了一个msgBodyCompressed的这个标记是作何使用呢? 这里往下追
如下图5
image.png
追到了813行这里就开始判断。这里很奇怪的是msg.setBody 这里设置的是之前没有被压缩过的消息啊?
后面看注释和那个Fix bug才明白 :

这个bug很有意思。
所以这里的body还是为压缩之前的body,但是将第一次压缩后的message进行了拷贝,放到tmpMessage里面了,最后消息发送的时候也是发送tmpMessage。


总结:

1 rocketmq的消息body压缩阀值是4K
2 UtilAll的这个静态类可以拿去复用,里面有现成的压缩和解压方法
3 感觉这个消息重试的流程是不是不太好,如果第一次发送失败,重试的时候还需要再次进行压缩是不是太耗时了,这里是否有改进的空间?比如重试的时候直接将前一个message保存一下,然后直接将其发送给netty进行传递?(这里我还没有仔细梳理消息的重试流程,不知道是否是理解有误。)

标签:body,这里,压缩,Compress,发送,RocketMQ,阀值,消息
来源: https://www.cnblogs.com/junjiedeng/p/14611537.html