RocketMQ4.3.x入门
作者:互联网
目录
12如何把消息发送到指定的队列(Message Queue)
1.RocketMQ整体介绍
1.1简介
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 1.能够保证严格的消息顺序 2.提供丰富的消息拉取模式 3.高效的订阅者水平扩展能力 4.实时的消息订阅机制 5.亿级消息堆积能力
1.2选用理由
1. 阿里巴巴开源的分布式、队列模型的消息中间件,遵循JMS开发规范。 2. 4.3.x支持分布式事务 3. 官网地址:https://rocketmq.apache.org/ -- https://rocketmq.apache.org/release_notes/release-notes-4.3.0/ 4. 支持集群模型、负载均衡、水平扩展能力 5. 亿级别的消息堆积能力 6. 采用0拷贝的原理、顺序写盘、随机读 7. 丰富api使用 8. 代码优秀底层采用Netty NIO 框架 9. NameServer(更轻量级网络路由服务) 代替 Zookeper 10.消息失败重试机制、消息可查询 11.社区活跃、成熟度(经过双11考验)
2.概念模型
1.简述
producer:消息生产者。负责生产消息,一般由后台业务系统负责生产消息。 consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。 push consumer:consumer的一种,需要像consumer对象注册监听 pull consumer:consumer的一种,需要主动请求Broker拉去消息 prodicer group:生产者集合,一般用于发送一类消息 consumer group:消费着集合,一般用于接收一类消息进行消费 borker:mq消息服务,消息的存储生产消费和转发
2.主题与标签
主题 Tpoic:第一级消息类型,书的标题 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤 例如: 主题: 订单交易 标签: 订单交易-创建 订单交易-付款 订单交易-完成
3.发送与订阅群组
rocketmq-broker:主要业务逻辑消息收发,主从同步,pagecache rocketmq-client:客户端接口生产消费者都要用 rocketmq-example:示例比如生产者消费者 rocketmq-common:公共数据结构 rocketmq-distribution:编译模块编译输出 rocketmq-filter:过滤broker过滤不感兴趣的消息传输,减小带宽压力 rocketmq-logging:日志相关的包 rocketmq-namesrv:用于服务协调 rocketmq-openmessaging:对外提供服务 rocketmq-remoting:远程调用接口,封装Nettty底层通信 rocketmq-srvutil:公用的工具方法,比如解析命令行参数 rocketmq-store:消息存储 rocketmq-tools:管理工具 mqadmin工具
3.源码包编译与结构说明
rocketmq-broker:主要业务逻辑消息收发,主从同步,pagecache rocketmq-client:客户端接口生产消费者都要用 rocketmq-example:示例比如生产者消费者 rocketmq-common:公共数据结构 rocketmq-distribution:编译模块编译输出 rocketmq-filter:过滤broker过滤不感兴趣的消息传输,减小带宽压力 rocketmq-logging:日志相关的包 rocketmq-namesrv:用于服务协调 rocketmq-openmessaging:对外提供服务 rocketmq-remoting:远程调用接口,封装Nettty底层通信 rocketmq-srvutil:公用的工具方法,比如解析命令行参数 rocketmq-store:消息存储 rocketmq-tools:管理工具 mqadmin工具
2.环境搭建
CentOS7 jdk1.8 4.3.x
2.1上传MQ安装包
链接:https://pan.baidu.com/s/1wkA8uH75nMD4haal0i5zUg 提取码:4545 #上传根目录 cd /home #创建目录 mq目录 mkdir rocket #配置host vi /etc/hosts # 主节点服务器 47.97.228.218 rocketmq-nameserver1 47.97.228.218 rocketmq-master1 # 从节点服务器 150.158.140.224 rocketmq-nameserver2 150.158.140.224 rocketmq-master1-slave
2.2解压
tar -zxvf apache-rocketmq.tar.gz -C /home/rocket
2.3创建存储路径
mkdir /home/rocket/store mkdir /home/rocket/store/commitlog mkdir /home/rocket/store/consumequeue mkdir /home/rocket/store/index
2.4修改broker配置文件
# Master vim /home/rocket/conf/2m-2s-async/broker-a.properties
brokerClusterName=rocketmq-cluster-1 #broker 名字,角色是Slave需与Master相同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer 地址,分号分割 namesrvAddr=rocketmq-nameserver1:20000;rocketmq-nameserver2:20000 #namesrvAddr=rocketmq-nameserver1:20000 brokerIP1=47.97.228.218 brokerIP2=47.97.228.218 #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=20001 #删除文件时间点,默认凌晨 4 点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog 每个文件的大小默认 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/rocket/store #commitLog 存储路径 storePathCommitLog=/home/rocket/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/home/rocket/store/consumequeue #消息索引存储路径 storePathIndex=/home/rocket/store/index #checkpoint 文件存储路径 storeCheckpoint=/home/rocket/store/checkpoint #abort 文件存储路径 abortFile=/home/rocket/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制 Master #- SYNC_MASTER 同步双写 Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
2.5修改日志配置文件
# 创建日志文件夹 mkdir -p /home/rocket/logs # 日志配置修改 cd /home/rocket/conf sed -i 's#${user.home}#/home/rocket#g' *.xml
2.6修改启动脚本参数
vim /home/rocket/bin/runbroker.sh #=========================================================================================== # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m"
vim /home/rocket/bin/runserver.sh #=========================================================================================== # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
2.7启动mqnameser
2.7.1添加nameser.properties
vim /home/rocket/conf/nameser.properties # 文件内容 默认9876 listenPort=20000
2.7.2启动命令
nohup sh /home/rocket/bin/mqnamesrv -c /home/rocket/conf/nameser.properties &
2.8启动BrokerServer
nohup sh /home/rocket/bin/mqbroker -c /home/rocket/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
2.9查看日志
tail -f -n 500 /home/rocket/logs/rocketmqlogs/namesrv.log tail -f -n 500 /home/rocket/logs/rocketmqlogs/broker.log
2.10打开端口
2.10.1namesrv只使用了一个9876端口,修改了默认配置改为20000
firewall-cmd --zone=public --add-port=20000/tcp --permanent
2.10.2broker的服务端口号 修改broker的listenPort端口号为20001
firewall-cmd --zone=public --add-port=20001/tcp --permanent
2.10.3 haListenPort haListenPort是haService中使用
//默认值为:listenPort + 1 这个值是在BrokerStartup.java中设置的 /** * 影响haservice中AcceptSocketService服务的端口号 * 参考: * com.alibaba.rocketmq.store.ha.HAService 构造函数中 * this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); */ messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
firewall-cmd --zone=public --add-port=20002/tcp --permanent
2.10.4fastListenPort
/** *主要用于slave同master同步。 默认为:主要是fastRemotingServer服务使用 listenPort - 2 * listenPort - 2 默认 10909 */ fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
firewall-cmd --zone=public --add-port=19999/tcp --permanent
3.控制台部署与使用
3.1控制台github源码地址
链接:https://pan.baidu.com/s/1MnQZkLt_FoVaOFwtAAgarQ 提取码:4545
3.2修改源码配置文件
rocketmq.config.namesrvAdd
rocketmq.config.isVIPChannel
server.contextPath= server.port=8080 #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr=47.97.228.218:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel=false #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true
3.3打开防火墙
firewall-cmd --zone=public --add-port=8080/tcp --permanent firewall-cmd --reload
4.QuickStart-生产者使用
4.1依赖地址
链接:https://pan.baidu.com/s/1dvTVXCccJXiCBma4WCTSXg 提取码:4545
4.2简单代码示例
package com.cnnct.test; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 消息生产者测试代码 * * @author busl * @version 1.0 */ public class MyRocketMqProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("rent_quick_producer_name"); producer.setNamesrvAddr("47.97.228.218:9876"); producer.setSendMsgTimeout(60000); producer.start(); for (int i = 0; i < 5; i++) { try { { Message msg = new Message( //主题 "TopicTest", //标签 "TagA", /** * 例如: * 主题: * 订单交易 * 标签: * 订单交易-创建 * 订单交易-付款 * 订单交易-完成 */ //用户自定义的key主键标识 "OrderID18"+i, //消息体 bate[] ("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
5.QuickStart-消费者使用
package com.cnnct.test; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 消息消费者测试代码 * * @author busl * @version 1.0 */ public class MyRocketMqPushConsumerTest { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rent_quick_Consumer_name"); //设置地址 consumer.setNamesrvAddr("47.97.228.218:9876"); consumer.subscribe( //主题 "TopicTest", //消息过滤:模糊匹配 正则表达式或者* 全部 "TagA"); //枚举类型 当前是从最后端开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); try { // 主题 String topic = messageExt.getTopic(); // 标签 String tags = messageExt.getTags(); // 用户自定义的key主键标识 String keys = messageExt.getKeys(); if (keys.equals("OrderID183")) { System.out.println("消息消费失败。。。"); int a = 1 / 0; } // 内容 String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic = " + topic + "tags = " + tags + "keys = " + keys + "msgBody = " + msgBody); } catch (Exception exception) { exception.printStackTrace(); //重试次数大于三次则然会成功 System.out.println("重试次数" + messageExt.getReconsumeTimes()); if (messageExt.getReconsumeTimes() > 3) { //记录 System.out.println("重试三次还是失败"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //过一段时间重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
6.四种集群环境构建详解
6.1单点模式
这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
6.2主从模式
1.主节点接收消息同步给从节点一条消息,如果主节点挂了,消息还没有被消费,从节点还有消息,消息也可以消费(不能接收消息),保证消息的及时性和可靠性。 2.投递一条消息后,关闭主节点 3.从节点继续可以提供消费者数据进行消费,但是不能接收消息 4.主节点上线后进行消费进度offset同步(数据内容 + 元数据信息)
6.2.1环境搭建
6.2.1.1准备安装包和创建存储路径
2.1上传安装包->2.2解压0->2.3创建存储路径
6.2.1.2修改broker slave配置文件
# brokerRole、brokerId、namesrvAddr vim /home/rocket/conf/2m-2s-async/broker-a-s.properties
brokerClusterName=rocketmq-cluster-1 #broker 名字,角色是Slave需与Master相同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer 地址,分号分割 namesrvAddr=rocketmq-nameserver2:20000;rocketmq-nameserver1:20000 brokerIP1=150.158.140.224 #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=20001 #删除文件时间点,默认凌晨 4 点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog 每个文件的大小默认 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/rocket/store #commitLog 存储路径 storePathCommitLog=/home/rocket/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/home/rocket/store/consumequeue #消息索引存储路径 storePathIndex=/home/rocket/store/index #checkpoint 文件存储路径 storeCheckpoint=/home/rocket/store/checkpoint #abort 文件存储路径 abortFile=/home/rocket/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制 Master #- SYNC_MASTER 同步双写 Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
6.2.1.3修改其他配置文件
2.5->2.6 #打开配置文件修改主节点nameServer vim /home/rocket/conf/2m-2s-async/broker-a.properties #nameServer 地址,分号分割 namesrvAddr=47.97.228.218:9876;150.158.140.224:9876
6.2.1.4关闭主节点nameser和brokerser
# 查询mq进程 ps -ef | grep java # 按照下面顺序 kill掉 进程 kill -9 broker进程id kill -9 nameser进程id
6.2.1.5配置nameser启动端口并启动两台nameser
#添加nameser.properties vim /home/rocket/conf/nameser.properties # 文件内容 默认9876 listenPort=20000 #启动命令 nohup sh /home/rocket/bin/mqnamesrv -c /home/rocket/conf/nameser.properties & #查看启动日志 tail -f -n 500 /home/rocket/logs/rocketmqlogs/namesrv.log #打开nameserv端口 firewall-cmd --zone=public --add-port=20000/tcp --permanent #打开broker进程idserv端口 firewall-cmd --zone=public --add-port=20001/tcp --permanent firewall-cmd --zone=public --add-port=19999/tcp --permanent firewall-cmd --zone=public --add-port=20002/tcp --permanent firewall-cmd --reload
6.2.1.6启动Master brokerSer
nohup sh /home/rocket/bin/mqbroker -c /home/rocket/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 & # 查看日志 tail -f -n 500 /home/rocket/logs/rocketmqlogs/broker.log
6.2.1.7启动Slave brokerSer
nohup sh /home/rocket/bin/mqbroker -c /home/rocket/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 & # 查看日志 tail -f -n 500 /home/rocket/logs/rocketmqlogs/broker.log
6.3多主模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠,消息也不会丢(异步刷 盘丢失少量消息,同步刷盘一条不丢)。性能最高。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息 实时性会受到受到影响。 ### 先启动 NameServer ### 在机器 A,启动第一个 Master ### 在机器 B,启动第二个 Master
6.4多 Master 多 Slave 模式,异步复制
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短 暂消息延迟,毫秒级。 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕 机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。 缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。 ### 先启动 NameServer ### 在机器 A,启动第一个 Master ### 在机器 B,启动第二个 Master ### 在机器 C,启动第一个 Slave ### 在机器 D,启动第二个 Slave
6.5多 Master 多 Slave 模式,同步双写
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写 成功,向应用返回成功。 优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可 用性都非常高 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目 前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。 ### 先启动 NameServer ### 在机器 A,启动第一个 Master ### 在机器 B,启动第二个 Master ### 在机器 C,启动第一个 Slave ### 在机器 D,启动第二个 Slave 以上 Broker 与 Slave 配对是通过指定相同的 brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂 载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。
7.主从模式集群环境构建与测试(故障演练)
1.正常启动主从 2.消费者不启动,生产者正常发消息 3.关闭主节点这样就有未消费的消息 3.从节点继续可以提供消费者数据进行消费,但是不能接收消息 4.主节点上线后进行消费进度offset同步
8.RocketMQ生产者核心参数详解
8.1producerGroup:组名
一个生产者组,代表着一群topic相同的Producer。即一个生产者组是同一类Producer的组合。 一个应用里面组名是唯一的;
8.2CreateTopicKey
使用RocketMQ进行发消息时,必须要指定topic,对于topic的设置有一个开关autoCreateTopicEnable,一般在开发测试环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,没有统一的审核等等,所以在正式环境中会在Broker启动时设置参数autoCreateTopicEnable = false。这样当需要增加topic时就需要在web管理界面上添加即可。
8.3DefaultTopicQueueNum(默认为4)
8.4setMessageTimeOut(单位:ms)
8.5getCompressMsgBodyOverHowmuch(默认压缩字节4096)
8.6RetryTimesWhenSendFailed(同步重发策略可配置)
8.7RetryTimesWhenSendAsyncFailed(异步重发策略可配置)
8.8RetryAnotherBrokerWhenNotStoreOK(没有存储成功找其他broker存储)
8.9MaxMessageSize(最大的消息限制默认128k)
9.生产者同步发送消息和异步发送消息
9.1同步发送消息
producer.send(Message msg)
9.2同步发送核心实现
DefaultMQProducerImpl.java
9.3异步发送消息
producer.send(Message msg, SendCallback SendCallback);
9.4异步发送消息核心实现
DefaultMQProducerImpl.java
10.生产者消息的返回状态
{ "SendResult":{ "SendStatus":{ "SEND_OK":"消息发送成功", "FLUSH_DISK_TIMEOUT":"消息发送到服务器,刷盘超时。可靠性投递的话考虑重发消息", "FLUSH_SLAVE_TIMEOUT":"消息同步到slave超时,slave宕机", "SLAVE_NOT_AVAILABLE":"消息发送成功了,slave不可用" } } }
11.延迟消息
延迟消息:消息发送到broker后,要特定的时间才会被Consumer消费 目前只支持固定精度的定时消息 MessageStoreConfig配置类&&ScheduleMessageService Message.setDelayTimeLevel();方法实现 1对应 1秒 2对应5秒 以此类推 messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
12如何把消息发送到指定的队列(Message Queue)
//发送到指定的队列 3指定的队列 SendResult send = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object o) { Integer arg = (Integer) o; return mqs.get(arg); } }, 3); System.out.println("send = " + send);
13.pushConsumer消费者核心参数详解
Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立 刻回调 Listener 接口方法。
14.消费模式-集群模式
15.消费模式广播模式
16.消费者存储核心偏移量
16.1
16.2
17.长轮询模式分析
18.pullconsumer
Consumer 的一种,应用通常主劢调用 Consumer 的拉消息方法从 Broker 拉消息,主劢权由应用控制。
标签:入门,rocket,--,Master,消息,home,RocketMQ4.3,rocketmq 来源: https://blog.csdn.net/adminBfl/article/details/122322041