其他分享
首页 > 其他分享> > RocketMQ配置与使用

RocketMQ配置与使用

作者:互联网

RocketMQ配置与使用

1、MQ介绍

1.1 mq介绍

Rocketmq是一款分布式消息中间件,拥有高可靠性,高性能的特点。最初是由阿里巴巴中间件团队研发并大规模使用于生产系统,满足线上海量堆积的需求。Rocketmq默认采用长轮询的拉取模式,单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。

mq是一种先进先出的队列模式:
在这里插入图片描述

1.2 mq的应用场景

1)应用解耦

系统的耦合性越高,容错性就会越低。以订单支付系统来说,系统耦合调用库存系统,支付系统、物流系统,其中任何一个系统出现故障都会导致整个系统瘫痪,影响用户的使用。
在这里插入图片描述
通过rocketmq中间解耦之后,当库存系统出现故障时,可以将需要库存系统处理的信息先储存到MQ中,其他系统进行正常的下单、支付等操作,等库存系统恢复之后,从MQ中拉取需要处理的信息进行处理,而用户不会感知到库存系统出现故障。
在这里插入图片描述

2)数据分发

在这里插入图片描述
消息生产者将数据发送到MQ消息队列,消息队列让数据在多个系统之间流通。数据的生产者只需要将数据发送到MQ的消息队列中,而不用关心由谁来使用这个数据,数据的使用者也只需要直接从Mq的消息队列中拉取需要的数据,而不需要和数据的生产者打交道。
在这里插入图片描述

3)流量削峰

当应用系统遇到流量瞬间猛增时,可能会导致系统压力过大,将整个系统压崩。而有了MQ之后,系统可以将大量的请求暂且堆积到MQ的消息队列中,通过轮询的方式进行分批处理,从而减轻系统的压力。
在这里插入图片描述

在这里插入图片描述

1.3 MQ架构

在这里插入图片描述
在这里插入图片描述

1.4 MQ储存模型

在这里插入图片描述

2、RocketMq安装入门

2.1准备工作

2.1.1 下载Rocketmq

这里选择的 RocketMQ 的版本:4.6.0

下载地址:http://rocketmq.apache.org/dowloading/releases/
官方文档:http://rocketmq.apache.org/docs/quick-start/

2.1.2 环境要求

2.2 开始安装

2.2.1安装步骤

以二进制包方式进行安装:

2.2.2 安装目录介绍

2.3 启动Rocketmq

1.RocketMq默认使用的虚拟机内存较大,启动Broker和NameServer有可能会因为内存不足而启动失败,所以需要在启动前先修改启动脚本中的JVM参数

#编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vi bin/runbroker.sh
	# 参考设置
	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

$ vi bin/runserver.sh
	# 参考设置
	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

2.启动NameServer

# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

3.启动Broker

# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 
bin/mqbroker 的一些可选参数:

-c:指定配置文件路径
-n:NameServer 的地址

2.4 测试RocketMq

2.4.1 消息发送

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.4.2 接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.5 关闭Rocketmq

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

2.6 Broker配置文件讲解

broker 默认的配置文件位置在:conf/broker.conf。也可以自定义配置文件名称和位置,在启动Broker时用 -c 参数指定自定义的配置文件。

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2.7 Rocketmq可视化界面安装

RocketMQ 有一个对其扩展的开源项目 incubator-rocketmq-externals,这个项目中有一个子模块叫 rocketmq-console,这个便是管理控制台项目了,先将 incubator-rocketmq-externals 拉到本地,因为我们需要自己对 rocketmq-console 进行编译打包运行。
在这里插入图片描述

2.7.1 下载并编译项目

1.克隆项目

git clone https://github.com/apache/rocketmq-externals

2.在 rocketmq-console 中配置 namesrv 集群地址:

$ cd rocketmq-console
$ vim src/main/resources/application.properties
	rocketmq.config.namesrvAddr=10.129.33.32:9876

3.配置完成进行编译并打包

mvn clean package -Dmaven.test.skip=true

4.启动 rocketmq-console

java -jar rocketmq-console-ng-2.0.0.jar

启动成功后,我们就可以通过浏览器访问 http://IP地址:8080 进入控制台界面了,如下图:
在这里插入图片描述

5.消息存储

rocketmq 消息进行持久化存储,从而达到 分布式队列高可靠性的要求。
在这里插入图片描述

  1. 消息生产者发送消息到MQ
  2. MQ接收到消息后,进行持久化存储,在存储中新增一条记录
  3. 返回ACK给生产者
  4. MQ push消息给对应的消费者,并等待消费者返回ACK
  5. 如果在指定的时间内,消息消费者返回ACK给MQ,那么MQ则认为消费者消费消息成功,则执行第6步,在存储中将消息删除;如果在指定的时间内MQ没有接收到消费者返回的ACK,则会认为消息消费失败,MQ会尝试重新push该消息到指定的消费者,重新执行4、5、6步骤。

5.1 消息存储介质

RocketMq将消息数据刷盘到所部署的物理机的文件系统中来进行持久化(刷盘方式分为同步刷盘异步刷盘两种方式)。
消息刷盘为消息提供了一种高效率、高可靠性、高性能的持久化方式。

5.2 消息的储存结构

在这里插入图片描述
RocketMQ的消息存储是由ConsumerQueueCommitLog配合完成的,ConsumerQueue中值存储很少的数据,类似于数据库的索引文件,存储的是指向物理存储的地址,消息的读写都是通过物理存储CommitLog来进行读写的。
如果一条消息只在CommitLog中有数据,在ConsumerQueue中没有,则消费者无法消费这条消息。

5.3 刷盘机制

Rocketmq采用将消息刷盘的方式进行持久化,将消息数据写入到磁盘上,既保证了服务器断电重启后数据的不丢失,也可以存储超出内存限制的数据量。RocketMq为了保证性能,尽可能的保证磁盘的顺序写(目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!)。Producer发送消息到RocketMq进行持久化的时候,由两种刷盘方式,一种是同步刷盘、一种是异步刷盘
在这里插入图片描述

1)同步刷盘

RocketMq返回写成功的状态时,消息已经被写入本地磁盘。具体流程为,消息写入内存后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程刷盘完成之后唤醒等待的线程,返回消息写成功的状态。

2)异步刷盘

RocketMq返回写成功的状态时,消息只是可能被写入内存的pagecache,写操作的返回快,当内存中消息达到一定量的时候,统一出发写磁盘的动作,快速写入。

3)刷盘配置

同步刷盘和异步刷盘,都是通过broker配置文件中的flushDiskType参数设置的,把这个参数配置成SYNC_FLUSH(同步刷盘)或 ASYNC_FLUSH(异步刷盘)。

5.4 零拷贝技术

Linux操作系统分为用户态内核态,系统文件操作、网络操作会涉及这两种形态的切换,在切换时会产生数据的复制。
一台服务器将本地磁盘文件发送到客户端,一般需要经过两个阶段

在上述两个操作中,一共进行了4次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存
  2. 从内核态内存复制到用户态内存
  3. 从用户态内存复制到网卡驱动内核态内存
  4. 从网卡驱动内核态内存复制到网卡中进行传输

在这里插入图片描述

RocketMQ使用mmap的方式,可以省去向用户态内存复制的过程,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速率。这种机制在java中是通过MappedByteBuffer 技术实现的。

需要注意的是,使用MappedByteBuffer这种技术会有一些限制,其中的一个限制就是向用户态内存复制的数据一次不能超过1G,这也是为什么Commitlog文件大小默认设置为1G大小的原因。

6.RocketMQ心跳机制

Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesrv发心跳时,
会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,
网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。

原文链接:https://blog.csdn.net/javahongxi/article/details/84931747

标签:存储,配置,Broker,rocketmq,MQ,消息,使用,RocketMQ,刷盘
来源: https://blog.csdn.net/qq_35198376/article/details/116641887