RocketMQ 轻松入门
作者:互联网
1. 文档汇总
源码地址:https://github.com/apache/rocketmq
中文文档:https://github.com/apache/rocketmq/tree/master/docs/cn
商业版:https://www.aliyun.com/product/rocketmq
官网翻译:http://www.itmuch.com/books/rocketmq/
FAQ:http://rocketmq.apache.org/docs/faq/
RocketMQ常用管理命令:https://blog.csdn.net/gwd1154978352/article/details/80829534
RocketMQ默认配置:https://www.cnblogs.com/jice/p/11981107.html
2. 架构设计
一般见到的架构图都是这个样子的,我们来看下各个角色的作用。
2.1 Broker
RocketMQ的服务,或者说一个进程,叫做Broker。Broker的作用是存储和转发消息。RocketMQ单机大约能承受10万QPS的请求。
为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载均衡),通常会做集群部署。
和Kafka一样,RocketMQ集群的每一个Broker节点保存总数据的一部分,因此可以实现横向扩展。
为了提升可靠性(防止数据丢失),每个Broker都有自己的副本(slave)。
默认情况下,读写都放在master上。在slaveReadEnable=true的情况下,slave也可以参与读负载。但是默认只有Broker=1的slave才会参与读负载,而且实在master消费慢的情况下,由whichBrokerWhenConsumeSlowly这个参数决定。
2.2 Topic
Topic用于将消息按主题做划分。比如订单消息、物流消息等。注意,跟Kafka不同的是,在RocketMQ中,topic是一个逻辑概念,消息不是按照Topic划分存储的。
Producer 将消息发往指定的Topic,Consumer订阅这个Topic就可以收到相应的消息。
跟Kafka一样,如果topic不存在则自动创建。
private Boolean autoCreateTopicEnable=tru;
Topic 和生产者,消费者都是多对多的关系,一个生产者可以发送消息给多个topic,一个消费者也可以订阅多个topic。
2.3 NameServer
当不同的消息存储在不同的Broker上,生产者和消费者对于Broker的获取,或者说路由选择是一个非常关键的问题。
所以,和分布式服务中的注册中心一样,RocketMQ也需要一个角色来统一管理Broker的信息。
在Kafka里面,是使用zookeeper来统一管理的。但是,RocketMQ偏偏没有这么做,他实现了一个自己的服务——NameServer。
我们可以将其理解为RocketMQ的路由中心,每一个NameServer节点都保存着全量的路由信息,为了保证高可用,NameServer自身也可以做集群的部署。作用类似于Eureka或者Redis Sentinel.
也就是说,Broker会在NameServer上注册自己,Producer和Consumer用NameServer来发现Broker。
2.3.1NameServer作为路由中心是怎么工作的呢?
每个Broker节点在启动时,都会根据配置遍历NameServer列表。
rocket/conf/broke.conf
namesrv=localhost:9876
与每个NameServer建立TCP长连接,注册自己的信息,之后每隔30S发送心跳信息(服务主动注册)。
如果Broker挂掉了,不发送心跳了,NameServer怎么发现呢?
所以除了主动注册,还有定时探活。每个NameServer每隔10S检查一下各个Broker的最近一次心跳时间,如果发现某个Broker超过120S没有发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。
既然都是注册中心,那为什么不使用Zookeeper呢?
实际上在早期的版本里,RocketMQ也采用的zookeeper,但是去掉了zookeeper依赖,选择采用自己的NameServer。
这因为RocketMQ的架构设计决定了只需要一个轻量级的元数据数据库就足够了,只需要保持最终一致即可,而不需要zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少维护成本。
根据著名的CAP理论:一致性,可用性,分区容错性。zookeeper选择了CP,而NameServer选择了AP,放弃了实时一致性。
2.3.2 一致性问题
既然NameServer之间是互不通信的,也没有主从之分,那他们是怎么保持一致性的?
我们从以下三点分析:
2.3.2.1 服务注册
如果新增了Broker,怎么新增到所有的NameServer中?
因为没有master,Broker每隔30秒会向所有的NameServer发送心跳消息,所以还是能保持一致的。
2.3.2.2 服务剔除
如果一个Broker挂了,怎么从所有的NameServer中移除信息?
- 如果Broker正常关闭,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除。
- 如果Broker异常关闭:NameServer的定时任务会每10秒扫描Broker列表,如果某个Broker的心跳包最新时间戳超过当前时间120S就会被移除。
通过以上两点,不管Broker挂了,还是恢复了,增加了还是减少了,NameServer都能够保持数据一致。
2.3.2.3 路由发现
如果Broker的信息更新了(增加或者减少节点),客户端怎么获取最新的Broker列表呢?
先说生产者。发送第一条消息的时候,根据Topic从nameserver获取路由消息。
然后是消费者。消费者一般是订阅固定的Topic,在启动的时候就要获取Broker的信息。
这之后如果Broker信息变了之后该怎么办?
因为NameServer不会主动推送服务信息给客户端,客户端也不会发送心跳到NameServer,所以在建立连接之后,需要生产者和消费者定期更新。
我们可以通过查看源码的时候了解他是怎么操作的。
在MQClientInstance类的send方法中,启动了一个定时任务。
其中第二个任务updateTopicRouteInfoFromNameServer方法,是用来定期更新NameServer信息的,默认是30S获取一次。
2.3.2.4 总结
各个NameServer的数据通过主动注册和定时探活是能够保持数据一致的。而消费者和生产者是通过定期更新路由从而获取最新的信息。
问题1:如果Broker挂掉,客户端30秒以后才更新路由信息,那是不是会出现最多30秒的数据延迟?
答:由如下几个解决思路
- 重试
- 把无法连接的Broker隔离
- 或者优先选择延迟小的节点,就能避免连接到容易挂的Broker了。
问题2:如果作为路由中心的NameServer全部挂掉了,而且暂时没有恢复呢?
答:这个也没问题,客户端会缓存Broker的信息,不完全依赖与NameServer。
2.4 Producer
生产者,用于生产消息,会定时从NameServer拉取路由信息(不用配置RocketMQ的服务地址),然后根据路由信息与指定的Broker建立TCP长连接,从而将消息发送到Broker中,发送逻辑一致的Producer可以组成一个Group。
RocketMQ的生产者同样支持批量发送,不过List要自己传过去。
Producer写数据只能操作master节点。
2.5 Consumer
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。消费逻辑一致的Consumer 可以组成一个Group,这时候消息会在Consumer之间负载。
由于Master和Slave都可以读取消息,因此Consumer 会与Master和Slave都建立连接。
注意:同一个consumer group内的消费者应该订阅同一个topic。或者反过来,消费不同topic的消费者不应该采用相同的consumer group名字。如果不一样,后面的消费者的订阅,会覆盖前面的订阅。
消费者有两种消费方式:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。
2.5.1 pull
从消费模型来说,RocketMQ支持 pull和push两种模式。
Pull模式是consumer轮询从broker拉取消息。实现类:DefaultMQPullConsumer(过时),替代类:DefaultLitePullConsumer。
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("my_test_consumer_group");
Pull有两种实现方式:一种是普通的轮询(Polling)。不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。
普通轮询的缺点:因为大部分时候没有数据,这些无效的请求会大大地浪费服务器的资源。而且定时请求的间隔过长的时候,会导致消息延迟。
RocketMQ的pull用长轮询来实现。
客户端发起Long Polling,如果此时服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling (所谓的hold住请求指的服务端暂时不回复结果,保存相关请求,不关闭请求连接,等相关数据准备好,写回客户端)。
长轮询解决了轮询的问题,唯一的缺点是服务器在挂起的时候比较耗内存。
2.5.2 push
Push模式是 Broker推送消息给consumer,实现类:DefaultMQPushConsumer。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
RocketMQ的 push 模式实际上是基于pull模式实现的,只不过是在 pull模式上封装了一层,所以RocketMQ push模式并不是真正意义上的“推模式”。
在RocketMQ中,PushConsumer 会注册MessageListener 监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
2.6 Message Queue
RocketMQ支持多master的架构。
思考一个这样的问题:当我们有多个master的时候,发往一个Topic的多条消息会在多个master的 Broker上存储。
那么,发往某一个Topic的多条消息,是不是在所有的Broker上存储完全相同的内容?
肯定不是的。如果所有的master存储相同的内容,而slave又跟master存储相同的内容:第一个,浪费了存储空间。第二个,无法通过增加机器数量线性地提升Broker的性能,也就是只能垂直扩展,通过升级硬件的方式提升性能,无法实现横向(水平)扩展。那么在分布式的环境中,RocketMQ的性能肯定会受到非常大的限制。
一句话:不符合分片的思想。
那么最关键的问题来了,怎么把发到一个Topic里面的消息分布到不同的master上呢?
在kafka里面设计了一个partition,一个Topic可以拆分成多个 partition,这些partition可以分布在不同的 Broker上,这样就实现了数据的分片,也决定了kafka可以实现横向扩展。
RocketMQ有没有这样的设计呢?
在一个Broker上,RocketMQ只有一个存储文件,并没有像Kafka一样按照不同的Topic分开存储。数据目录:
也就是说,如果有3个Broker,也就是只有3个用来存储不同数据的commitlog。那问题就来了,如果不按照分区去分布,数据应该根据什么分布呢?
RocketMQ里面设计了一个叫做Message Queue的逻辑概念,作用跟partition类似。
首先,我们创建Topic的时候会指定队列的数量,一个叫 writeQueueNums (写队列数量),一个readQueueNums(读队列数量)。
写队列的数量决定了有几个Message Queue,读队列的数量决定了有几个线程来消费这些Message Queue(只是用来负载的)。
那不指定MQ的时候呢?默认有几个MQ呢?
服务端创建一个Topic默认8个队列(BrokerConfig):
private int defaultTopicQueueNums=8
topi不存在,生产者发送消息时创建默认4个队列(DefaultMQProducer):
private volatile int defaultTopicQueueNums=4;
MessageQueue在磁盘上是可以看到的,但是数量只跟写队列相关。
比如我用producer发送消息,consumerqueue目录下面就会出现四个目录。
客户端封装了一个MessageQueue对象,里面其实就是三块内容:
private String topic;
private String brokerName;
private int queueId;
topic表示它是哪个topic 的队列。Broker代表它在哪个Broker 上,比如有两个master,一个叫broker-a,一个叫 broker-b。queueld 代表它是第几个分片。
举例:一个Topic有3个Message Queue,编号是1、2、3。刚好有三个Broker,第一个MQ指向Broker1,第二个MQ指向Broker2,第三个MQ指向Broker3。
发送消息的时候,生产者会根据一定的规则,获得MessageQueue,只要拿到了queueld,就知道要发往哪个Broker,然后在 commitlog 写入消息。
磁盘上看到的队列数量,是由写队列的数量决定的,而且在所有的master上个数是一样的(但是数据存储不一样)。
举例:集群有两个master。如果创建一个topic ,有2个写队列、1个读队列(topic名字:q-2-1)。
那么两台机器的consumequeue目录会出现2个队列,一共4个队列。/opt/rocketmq/store/broker-a/consumequeue/q-2-1也就是总队列数量是:写队列数*节点数。
如果我们发送6条消息,给消息依次编号,会选择什么队列发送呢?
我们用代码测试以下:
先启动消费者代码:
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
// Subscribe one more more topics to consume.
consumer.subscribe("q-2-1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String messageBody = "";
try {
messageBody = new String(msg.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
然后启动生产者:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
producer.start();
for (int i = 0; i < 6; i++) {
try {
// tags 用于过滤消息 keys 索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息
Message msg = new Message("q-2-1",
"TagA",
"2673",
("RocketMQ " + String.format("%05d", i)).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(String.format("%05d", i) + " : " + sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
因为消费者只有一个读队列,所以它只能消费编号为0的队列。
反过来,如果是1个写队列,2个读队列:
那么broker的consumerqueue目录会出现1个队列。
因为目前有2个读队列,所以所有消息都可以接收到。
总结
读写队列数量最好一致,否则会出现消费不了的情况。
3. Java开发
3.1 Java API
官方提供了Java客户端API,只需要引入相关依赖即可。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
官方也提供了详细的代码案例。
package | 作用 |
---|---|
batch | 批量消息,用List发送 |
benchmart | 性能测试 |
broadcast | 广播消息 |
delay | 延迟消息msg.setDelayTimeLevel |
filter | 基于tag或sql表达式过滤 |
operation | 命令行 |
ordermessage | 顺序消息 |
quickstart | 入门 |
rpc | 实现RPC调用 |
simple | ACL,异步,assign,subscribe |
tracemessage | 消息追踪 |
transaction | 事务消息 |
3.1.1 生产者
下面我们看看简单的生产者代码:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
producer.start();
for (int i = 0; i < 6; i++) {
try {
// tags 用于过滤消息 keys 索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息
Message msg = new Message("q-2-1",
"TagA",
"2673",
("RocketMQ " + String.format("%05d", i)).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(String.format("%05d", i) + " : " + sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
代码解读:
- 多个发送同一类消息的生产者称之为一个生产者组。
- 生产者需要通过NameServer获取所有Broker的路由信息,多个用分号隔开。跟Redis 的哨兵一样,通过哨兵获取服务端地址。
- Message代表一条消息,必须指定Topic,代表消息的分类,是一个逻辑概念,比如订单类的消息、资金类的消息。
- Tags:可选,用于消费端过滤消息,是对消息用途的细分,比如Topic是订单类的消息, tag可以是create,update, delete。
- Keys:消息关键词。如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于消息的索引,可以设置为消息的唯一编号(主键)。
- SendResult是发送结果的封装,包括消息状态、消息ld、选择的队列等等,只要不抛异常,就代表发送是成功的。
SendResult [sendStatus=SEND_OK, msgId=COA8006F5B6418B4AAC299CF37140009, offsetMsgld=null,messageQueue=MessageQueue [topic=ransaction-test-topic, brokerName=broker-b, queueld=3], queueOffset=5]
msgld:生产者生成的唯一编号,全局唯一,也叫uniqld。
offsetMsgld:消息偏移ID,该ID记录了消息所在集群的物理地址,主要包含所存储Broker服务器的地址(IP与端口号)以及所在commitlog 文件的物理偏移量。
3.1.2 消费者
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
// Subscribe one more more topics to consume.
consumer.subscribe("q-2-1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String messageBody = "";
try {
messageBody = new String(msg.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者代码解读:
1、消费者组别是消费同一topic的消费者分组。
2、消费者需要从nameServer拿到topic的 queue 所在的 Broker地址,多个用分号隔开。
3、Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,集群模式下消息只会发送给一个Consumer.
4、订阅可以使用通配符,topic名字后面的参数跟生产者消息的tag就对应起来了,可以使用通配符,*代表匹配所有消息;||隔开多个。
consumer.subscribe("q-2-1", "*");
5、return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这句话是告诉Broker消费成功,可以更新offset了。也就是发送ACK。
3.2 Spring boot
3.2.1 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
在spring boot中使用RocketMQ可以起到简化配置,管理对象,提供模板方法的目的。
3.2.2 配置
客户端的配置可以直接写在配置文件中
server.port=9096
spring.application.name=demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=3000
3.2.3 消费者
创建消费者类,加上@RocketMQMessageListener注解监听消息。
/**
* MessageModel:集群模式;广播模式
* ConsumeMode:顺序消费;无序消费
*/
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "consumer-group",
//selectorExpression = "tag1",selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
System.out.println("----------接收到rocketmq消息:" + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1、使用注解中的selectorExpression属性过滤消息。
2、注解属性中的MessageModel是消息模型,两个值:
CLISTERING:多个消息者轮询消费消息(默认)。
BROADCASTING:所有消费者都收到同样的消息。
3、consumeMode是消费模型,两个值:
- CONCURRENTLY:并发的(默认)。消费端并发消费,消息顺序得不到保证。到底有多少个线程并发消费,取决于线程池的大小。
- ORDERLY:有序的。消费端有序消费,也就是生产者发送的顺序跟消费者消费的顺序是一致的。
两者的区别:顺序消费需要对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理。很显然并发消费效率更高。
3.2.4 生产者
@Component
public class MessageSender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void syncSend() {
SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag", "这是一条同步消息", 10000);
System.out.println(result);
}
}
生产者的代码更加简单,只需要注入RocketMQTemplate就可以发送消息。
发送消息有几种类型:
1、同步(syncSend方法)∶指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
2、异步(asyncSend方法)︰指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。
3、单向(sendOneWay方法)∶特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
发送方法如何选择?
- 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
- 当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式;
- 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式。
4. 项目地址
标签:入门,Broker,轻松,topic,消息,NameServer,consumer,RocketMQ 来源: https://blog.csdn.net/qq_41432730/article/details/121984159