RocketMQ配置与使用
作者:互联网
RocketMQ配置与使用
- 1、MQ介绍
1、MQ介绍
1.1 mq介绍
Rocketmq是一款分布式消息中间件,拥有高可靠性,高性能的特点。最初是由阿里巴巴中间件团队研发并大规模使用于生产系统,满足线上海量堆积的需求。Rocketmq默认采用长轮询的拉取模式,单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
mq是一种先进先出的队列模式:
1.2 mq的应用场景
1)应用解耦
系统的耦合性越高,容错性就会越低。以订单支付系统来说,系统耦合调用库存系统,支付系统、物流系统,其中任何一个系统出现故障都会导致整个系统瘫痪,影响用户的使用。
通过rocketmq中间解耦之后,当库存系统出现故障时,可以将需要库存系统处理的信息先储存到MQ中,其他系统进行正常的下单、支付等操作,等库存系统恢复之后,从MQ中拉取需要处理的信息进行处理,而用户不会感知到库存系统出现故障。
2)数据分发
消息生产者将数据发送到MQ消息队列,消息队列让数据在多个系统之间流通。数据的生产者只需要将数据发送到MQ的消息队列中,而不用关心由谁来使用这个数据,数据的使用者也只需要直接从Mq的消息队列中拉取需要的数据,而不需要和数据的生产者打交道。
3)流量削峰
当应用系统遇到流量瞬间猛增时,可能会导致系统压力过大,将整个系统压崩。而有了MQ之后,系统可以将大量的请求暂且堆积到MQ的消息队列中,通过轮询的方式进行分批处理,从而减轻系统的压力。
1.3 MQ架构
- Producer:消息的发送者;举例:发件者
- Consumer:消息接收者;举例:收件人
- Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
- Broker:暂存和传输消息;举例:快递公司
- NameServer:管理 Broker;举例:快递公司的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
1.4 MQ储存模型
2、RocketMq安装入门
2.1准备工作
2.1.1 下载Rocketmq
这里选择的 RocketMQ 的版本:4.6.0
下载地址:http://rocketmq.apache.org/dowloading/releases/
官方文档:http://rocketmq.apache.org/docs/quick-start/
2.1.2 环境要求
- Linux64位系统
- JDK1.8(64位)
2.2 开始安装
2.2.1安装步骤
以二进制包方式进行安装:
- 解压安装包
- 进入安装目录
2.2.2 安装目录介绍
- bin:启动脚本,包括 shell 脚本和 CMD 脚本
- conf:实例配置文件 ,包括 broker 配置文件、logback 配置文件等
- lib:依赖 jar 包,包括Netty、commons-lang、FastJSON等
2.3 启动Rocketmq
1.RocketMq默认使用的虚拟机内存较大,启动Broker和NameServer有可能会因为内存不足而启动失败,所以需要在启动前先修改启动脚本中的JVM参数
#编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vi bin/runbroker.sh
# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
$ vi bin/runserver.sh
# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2.启动NameServer
# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
3.启动Broker
# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
bin/mqbroker 的一些可选参数:
-c:指定配置文件路径
-n:NameServer 的地址
2.4 测试RocketMq
2.4.1 消息发送
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2.4.2 接收消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.5 关闭Rocketmq
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker
2.6 Broker配置文件讲解
broker 默认的配置文件位置在:conf/broker.conf
。也可以自定义配置文件名称和位置,在启动Broker时用 -c
参数指定自定义的配置文件。
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2.7 Rocketmq可视化界面安装
RocketMQ 有一个对其扩展的开源项目 incubator-rocketmq-externals,这个项目中有一个子模块叫 rocketmq-console
,这个便是管理控制台项目了,先将 incubator-rocketmq-externals 拉到本地,因为我们需要自己对 rocketmq-console
进行编译打包运行。
2.7.1 下载并编译项目
1.克隆项目
git clone https://github.com/apache/rocketmq-externals
2.在 rocketmq-console
中配置 namesrv
集群地址:
$ cd rocketmq-console
$ vim src/main/resources/application.properties
rocketmq.config.namesrvAddr=10.129.33.32:9876
3.配置完成进行编译并打包
mvn clean package -Dmaven.test.skip=true
4.启动 rocketmq-console
java -jar rocketmq-console-ng-2.0.0.jar
启动成功后,我们就可以通过浏览器访问 http://IP地址:8080 进入控制台界面了,如下图:
5.消息存储
rocketmq 消息进行持久化存储,从而达到 分布式队列高可靠性的要求。
- 消息生产者发送消息到MQ
- MQ接收到消息后,进行持久化存储,在存储中新增一条记录
- 返回ACK给生产者
- MQ push消息给对应的消费者,并等待消费者返回ACK
- 如果在指定的时间内,消息消费者返回ACK给MQ,那么MQ则认为消费者消费消息成功,则执行第6步,在存储中将消息删除;如果在指定的时间内MQ没有接收到消费者返回的ACK,则会认为消息消费失败,MQ会尝试重新push该消息到指定的消费者,重新执行4、5、6步骤。
5.1 消息存储介质
RocketMq将消息数据刷盘
到所部署的物理机的文件系统
中来进行持久化(刷盘方式分为同步刷盘
和异步刷盘
两种方式)。
消息刷盘为消息提供了一种高效率、高可靠性、高性能的持久化方式。
5.2 消息的储存结构
RocketMQ的消息存储是由ConsumerQueue
和CommitLog
配合完成的,ConsumerQueue中值存储很少的数据,类似于数据库的索引文件
,存储的是指向物理存储的地址,消息的读写都是通过物理存储CommitLog
来进行读写的。
如果一条消息只在CommitLog中有数据,在ConsumerQueue中没有,则消费者无法消费这条消息。
- CommitLog:是消息的物理存储,是消息的主体,对应CommitLog建立一个 ConsumerQueue,每个ConsumerQueue对应一个MessageQueue,所以即使ConsumerQueue数据丢失,也可以通过CommitLog进行恢复。
- ConsumeQueue:是一个消息的逻辑队列,存储了这个Queue在CommitLog中的起始offset,log大小和MessageTag的hashCode。
5.3 刷盘机制
Rocketmq采用将消息刷盘
的方式进行持久化,将消息数据写入到磁盘上,既保证了服务器断电重启后数据的不丢失,也可以存储超出内存限制的数据量。RocketMq为了保证性能,尽可能的保证磁盘的顺序写(目前的高性能磁盘,顺序写速度可以达到 600MB/s
, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s
,和顺序写的性能相差 6000
倍!)。Producer发送消息到RocketMq进行持久化的时候,由两种刷盘方式,一种是同步刷盘
、一种是异步刷盘
。
1)同步刷盘
RocketMq返回写成功的状态时,消息已经被写入本地磁盘。具体流程为,消息写入内存后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程刷盘完成之后唤醒等待的线程,返回消息写成功的状态。
2)异步刷盘
RocketMq返回写成功的状态时,消息只是可能被写入内存的pagecache
,写操作的返回快,当内存中消息达到一定量的时候,统一出发写磁盘的动作,快速写入。
3)刷盘配置
同步刷盘和异步刷盘,都是通过broker
配置文件中的flushDiskType
参数设置的,把这个参数配置成SYNC_FLUSH
(同步刷盘)或 ASYNC_FLUSH
(异步刷盘)。
5.4 零拷贝技术
Linux操作系统分为用户态
和内核态
,系统文件操作、网络操作会涉及这两种形态的切换
,在切换时会产生数据的复制。
一台服务器将本地磁盘文件发送到客户端,一般需要经过两个阶段
:
read
:系统读取本地磁盘的文件write
:将读取的文件通过网络发送出去
在上述两个操作中,一共进行了4次数据复制
,分别是:
- 从磁盘复制数据到内核态内存
- 从内核态内存复制到用户态内存
- 从用户态内存复制到网卡驱动内核态内存
- 从网卡驱动内核态内存复制到网卡中进行传输
RocketMQ使用mmap
的方式,可以省去向用户态内存复制的过程,也就是所谓的“零拷贝
”技术,提高消息存盘和网络发送的速率。这种机制在java中是通过MappedByteBuffer
技术实现的。
需要注意的是,使用
MappedByteBuffer
这种技术会有一些限制,其中的一个限制就是向用户态内存复制的数据一次不能超过1G
,这也是为什么Commitlog
文件大小默认设置为1G
大小的原因。
6.RocketMQ心跳机制
- 单个Broker跟所有的Namesrv保持心跳,心跳间隔为30秒,心跳请求中包含当前Broker中所有的Topic信息。Namesrv也会反查Broker信息,如果一个Broker在2分钟之内都没有心跳,则Name认为该Broker下线,会将Topic与Broker的对应关系重新分配。这个过程中Namesrv不会通知Producer和Consumer有Broker下线。
- Consumer与Broker是长连接,每30秒进行心跳检测。Broker每10秒向Consumer发送心跳检测,如果与Consumer在2分钟内都没有心跳信息,则断开与该Consumer的连接,并通知消费者集群的其他节点,触发消费者集群的重新负载均衡。
- 生产者Producer每30秒从Namesrv获取Broker和Topic信息,与所需要的Topic涉及的Broker建立长连接,向Broker每隔30秒发送心跳。Broker每隔10秒向Producer发送心跳检测,如果2分钟内没有心跳,则与该Producer断开连接。
Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesrv发心跳时,
会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,
网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。原文链接:https://blog.csdn.net/javahongxi/article/details/84931747
标签:存储,配置,Broker,rocketmq,MQ,消息,使用,RocketMQ,刷盘 来源: https://blog.csdn.net/qq_35198376/article/details/116641887