首页 > TAG信息列表 > ConsumeQueue

08第三章:02_消息的存储

一、消息的存储 RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store 目录中。 abort:该文件在 Broker 启动后会自动创建,正常关闭 Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关

浅谈rocketmq

rocketmq主要由4部分组成:Producer、Consumer、Broker、NameServer。 NameServer:整个集群的注册中心和配置中心,管理集群的元数据。包括 Topic 信息和路由信息、Producer 和 Consumer 的客户端注册信息、Broker 的注册信息。 Broker:负责接收消息的生产和消费请求,并进行消息的持久

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十二)延时队列

this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来

ConsumeQueue构建过程分析

1. 前言 理论上来说,RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢? ​ ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文

RocketMQ-broker存储机制-ConsumeQueue

   前面讲解到对于consumequeue的文件恢复和过期文件删除,和flush的过程这边就不再重点阐述,实际上consumequeue中的存储单元是一个20个字节的数据,前8个字节存储消息在commitlog上的物理点位,接着是4字节的消息size,最后是8字节的tag的hash值。可以看做消息在consumequeue上存储的是

RocketMQ 消息存储

引言 前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息存储部分的整体实现思路。 消息存储 通过前面的知识,我们已经知道了topic是如何分配到Broker的,以及消息发送方是如何决定把消息发送给哪个Broker的,接下来我们看一看Broker介绍到消息后,是怎么存储消息的

RocketMQ 源码分析 —— Message 拉取与消费(上)

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!1、概述2、ConsumeQueue 结构3、ConsumeQueue 存储4、Broker 提供[拉取消息]接口5、Broker 提供[更新消费进度]接口6、Broker 提供[发回消息]接口7、结尾阅读源码

RocketMQ源码解析之broker文件清理

原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍1.1 哪些文件需要清理1.2 RocketMQ文件清理的机制 2.源码解析2.1 清理commitlog2.2 ConsumeQueue 与indexFile 清理 总结 1. broker 清理文件介绍 1.1 哪些文件需要清理 首先我们需要介绍下在RocketMQ中哪些

RocketMQ 消息偏移量 Offset 和 CommitLog

消息偏移量 Offset 概念 message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。 message queue 中的 maxOffset 表示消息的最大 offset,maxOffset

consumeQueue 和 indexFile 文件

broker 把消息写入 commitLog 后,还需要把消息的索引写入 consumeQueue 文件 和 indexFile 文件 // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService@Overridepublic void run() { DefaultMessageStore.log.info(this.getServiceName() + " service starte