高性能分布式消息中间件—RocketMQ(一)
作者:互联网
引言:亲爱的读者大家好,本人刚刚成立的个人微信公众号,记录一下java当中实用的技术栈。也会分享一下工作当中遇到的问题和难题以及解决方案。我是一个技术宅,知识想单纯的记录一下自己在工作中和学习中的经验总结,如果你也喜欢研究技术,那么请关注我。相互学习,互相成长。
我是这样规划的,想从以下的技术栈记录:
1:分布式系统消息中间件。
2:分布式搜索引擎。
3:分布式JOB。
4:分布式缓存。
5:分库分表。
6:手把手教你打造一个第三方公众平台。
7:分布式系统性能监控技术。
8:Spring生态源码解读。
9:Spring生态实战技术。
10:并发技术汇总。
11:jvm调优。
12:netty。
13:架构实战。
14:java基础。
15:Mysql & TIDB。
废话不多说。作为本公众号开篇大作,我选择了Rocketmq。原因是,我觉得这款中间件,除了非常高的性能之外,同时也是作为一名java程序员来说一门必备的学科,无论面试,还是工作中,都离不开它。尤其是在电商的场景下,更能凸显出这款中间件的魅力。
简介:
RocketMQ作为一款纯java、是由阿里巴巴公司开发的分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
由开源社区killme2008维护,开源社区非常活跃。https://github.com/killme2008/Metamorphosis。Metaq 2.x。于2012年10月份上线,在淘宝内部被广泛使用 。Metaq 3.0发布时,产品名称改为RocketMQ。基于公司内部开源共建原则,RocketMQ项目只维护核心功能,且去除了所有其他运行时的依赖,核心功能最简化。每个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这个jar包即可,可通过API进行交互,如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。如今的rocketmq孵化成了Apache的顶级开源项目。
特点:
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)
- 支持拉(pull)和推(push)两种消息模式 (Push好理解,比如在消费者端设置Listener回调;而Pull,控制权在于应用,即应用需要主动的调用拉消息方法从Broker获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费))
- 单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)
- 支持多种消息协议,如 JMS、MQTT 等
- 分布式高可用的部署架构,满足至少一次消息传递语义 (RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)
- 提供 docker 镜像用于隔离测试和云集群部署
- 提供配置、指标和监控等功能丰富的 Dashboard
在Metaq1.x/2.x的版本中,分布式协调采用的是Zookeeper,而RocketMQ自己实现了一个NameServer,更加轻量级,性能更好!
- 组(Group)有Producer/Consumer Group。 ActiveMQ中并没有Group这个概念,而在RocketMQ中理解Group的机制很重要。通过Group机制,让RocketMQ天然的支持消息负载均衡!比如某个Topic有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息!(注意RocketMQ只有一种模式,即发布订阅模式。)
- 消息失败重试机制、高效的订阅者水平扩展能力、强大的API、事务机制等等。
专业术语
Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message
Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。
Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server
Name Server 为 producer 和 consumer 提供路由信息。
rocketmq基于Dledger构建高可用
构建集群需要至少3台服务器
本人的RocketMq集群是基于阿里云的centos7.9版本。网上很多教程写的基于虚拟机的本地环境。我认为要玩儿就玩儿的正式一点,真正可以接近生产环境的玩法。
构建安装DLedger
git clone https://github.com/openmessaging/openmessaging-storage-dledger.git
cd openmessaging-storage-dledger
mvn clean install -DskipTests
另外系统中还要安装一个git,用作拉取rocketmq控制台的源码工程。
下载RocketMQ
wget https://github.com/apache/rocketmq/releases
unzip rocketmq-all-4.7.1-source-release.zip
cd rocketmq-all-4.7.1/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
这里注意,rocketmq只有4.5以上版本支持dledger集群构建
修改配置文件
服务器说明:(生产中应该将 NameServer 部署到其他服务器中,在这为了方便,与Broker部署在一起)
安装Maven
cd /opt
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
tar -zxvf apache-maven-3.5.4-bin.tar.gz
# 修改环境变量
vim /etc/profile
# 最下面添加
export MAVEN_HOME=/opt/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH
# 保存退出
source /etc/profile
# 建立软连接
ln -s /opt/apache-maven-3.5.4/bin/mvn /usr/bin/mvn
安装JDK
这里需要自己前往oracle官方下载
配置java环境变量
环境变量配置 | 配置 |
---|---|
"" | export JAVA_HOME=/opt/jdk1.8.0_261 |
vi /etc/profile | export CLASSPATH=${JAVA_HOME}/lib |
"" | export PATH=$PATH:${JAVA_HOME}/bin |
source /etc/profile 刷新配置
修改配置文件
服务器 | ip | 安装的服务 |
---|---|---|
服务器1-主 | 172.30.134.189 | DLedger,Broker,NameServer |
服务器2-从 | 172.30.134.190 | DLedger,Broker,NameServer |
服务器3-从 | 172.30.134.191 | DLedger,Broker,NameServer |
服务器1配置-Master
进入rocketmq安装目录找到
vim conf/dledger/broker-n0.conf
修改Broker配置
## 集群名
brokerClusterName = RaftCluster
### broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
### 监听的端口
listenPort=10911
### 你设置的NameServer地址和端口
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
### n0 n1 n2 分别是broker1,broker2,broker3 的 dLegerSelfId
### 例如:dLegerPeers=n0-服务器1的IP:40911;n1-服务器2的IP:40912;n2-服务器3的IP:40913
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
### must be unique
autoCreateTopicEnable=true
### 这个值必须是在同一个RaftClusterGroup内唯一的
dLegerSelfId=n0
sendMessageThreadPoolNums=4
### 由于我的虚拟机配置了多个网卡,所以会绑定ip错误,因此我配置了这项,
brokerIP1=master主机的外网IP地址
服务器2配置-Slave
vim conf/dledger/broker-n1.conf
修改Broker配置
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10921
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
dLegerSelfId=n1
autoCreateTopicEnable=true
sendMessageThreadPoolNums=4
brokerIP1=内网IP
这里有个坑需要注意一下,如果说,你的master主机的brokerIP1设置了外网的ip那么你的slave节点的brokerIP1就要设置为内网的IP
服务器3配置-Slave
vim conf/dledger/broker-n2.conf 修改Broker配置
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10931
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
autoCreateTopicEnable=true
dLegerSelfId=n2
sendMessageThreadPoolNums=4
brokerIP1=172.30.134.191
启动集群
在服务器1 执行
nohup sh bin/mqnamesrv -n 172.30.134.189 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n0.conf autoCreateTopicEnable=true &
启动集群
在服务器2 执行
nohup sh bin/mqnamesrv -n 172.30.134.190 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n1.conf autoCreateTopicEnable=true &
启动集群
在服务器3 执行
nohup sh bin/mqnamesrv -n 172.30.134.191 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n2.conf autoCreateTopicEnable=true &
查看集群情况
sh bin/mqadmin clusterList -n 127.0.0.1:9876
备注
Broker 配置
Broker 配置
参考文档:
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReservedTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913 |
dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
启动内存不够
修改 bin/runbroker.sh 和 bin/runserver.sh 中的
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
报连接超时
查看防火墙服务状态 systemctl status firewalld
将防火墙关闭 systemctl stop firewalld
安装控制台
1.下载文件(/usr/local目录下)
wget https://github.com/apache/rocketmq-externals 地址下载并从本地上传到服务器
2.解压(/usr/local目录下)
yum install -y unzip zip 前提是:unzip解压文件无法使用 unzip rocketmq-externals-master.zip 解压文件
3.修改配置文件(usr/local/rocketmq-externals-master/目录下)
find -name application.properties 可以查看到两个文件都在rocketmq-console文件目录下
vi application.properties
rocketmq.config.namesrvAddr=namesrv服务地址(ip1:port;ip2:port)
4.编译(usr/local/rocketmq-externals-master/rocketmq-console/目录下)
mvn clean package -Dmaven.test.skip=true 如果失败多编译几次--可能是网络问题
编译成功后,在rocketmq-console目录下会生成一个目录:target目录,该目录下有启动rocketmq界面的jar文件
5.启动web(usr/local/rocketmq-externals-master/rocketmq-console/target目录下)
java -jar rocketmq-console-ng-1.0.0.jar 启动 ---当终端断了该服务就会停止 nohup java -jar rocketmq-console-ng-1.0.0.jar >>/usr/logs/log.out 2>&1 & 后台启动 --当终端断了也不会停止服务
6.添加端口并在阿里云服务器上开通端口
firewall-cmd --zone=public --add-port=8080/tcp --permanen 永久添加端口并重启防火墙
在阿里云服务器添加出入规则。
查看web监控:
控制台请求地址:http:master主机IP:端口号。
消费者组:
本篇文章就介绍到这里,读者们首先要学会安装rocketmq、以及rocketmq中的基础概念。后续的文章会对rocketmq做具体的介绍以及使用,以及如何集成Springboot和spring cloud stream。
本文由博客群发一文多发等运营工具平台 OpenWrite 发布
标签:RocketMQ,bin,broker,消息,消息中间件,172.30,rocketmq,分布式 来源: https://blog.csdn.net/zhanghui5211314/article/details/113360619