RabbitMQ
作者:互联网
声明:此文是小白本人学习Spring所写,主要参考(搬运)了:
1、MQ的介绍与应用场景
1.1 什么是MQ
MQ(Message Quene) : 翻译为消息队列,就是指存储消息的一个容器。它是一个典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,可以轻松的实现系统间解耦。别名为:消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
下面是MQ最简单的模型,它包含了四个关键词:生产者、消费者、消息和队列。
- 生产者:就是用于生产消息的应用程序。
- 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
- 队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
- 消费者:就是用于读取队列中消息的应用程序。
2、MQ的应用场景
使用消息中间件最主要的目的:
- [1] 应用解耦
- [2] 异步处理
- [3] 流量削峰
2.1 应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
- 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
- 由于订单系统调用了库存系统的接口,所以它们存在耦合
引入消息队列后的做法:
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
- 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。
2.2 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
(1) 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2) 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
(3) 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
2.3 流量削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。
传统架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。
引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。
1.3 使用MQ的优缺点
消息队列的优点:
- 解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的。消息生产者和消费者都不知道对方的存在;
- 异步:主流程只需要完成业务的核心功能;对于业务非核心功能,将消息放入到消息队列之中进行异步处理,减少请求的等待,提高系统的总体性能;
- 削峰/限流:将所有请求都写到消息队列中,消费服务器按照自身能够处理的请求数从队列中拿到请求,防止请求并发过高将系统搞崩溃;
消息队列的缺点:
- 系统的可用性降低:系统引用的外部依赖越多,越容易挂掉,如果MQ 服务器挂掉,那么可能会导致整套系统崩溃。这时就要考虑如何保证消息队列的高可用了;
- 系统复杂度提高:加入消息队列之后,需要保证消息没有重复消费、如何处理消息丢失的情况、如何保证消息传递的有序性等问题;
- 数据一致性问题:A 系统处理完了直接返回成功了,使用者都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,就会导致数据不一致了;
1.4 不同MQ之间的区别
市场上常见的消息队列有如下:
- ActiveMQ:Apache出品,基于JMS,是最早使用的消息队列产品,时间比较长了,现在已经不再维护了。
- ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里开源的消息中间件,纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
注:每种MQ没有绝对的好坏,主要依据使用场景,扬长避短,利用其优势,规避其劣势。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
厂商 | Rabbit | Apache | Alibaba | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | AMQP,OpenWire,STOMP,XMPP,REST | 自定义 | 自定义(基于TCP),社区封装了Http协议支持 |
优点 | 基于erlang,并发能力强,性能极好,延迟极低,稳定性和安全性很高,对性能和吞吐量的要求在其次,管理界面美观,社区非常活跃。 | 遵循JMS规范,安装方便,业界成为老牌,丰富的API和参考文档 | 在阿里被广泛应用于交易、充值、流计算、消 息推送、日志流式处理、binglog分发等场景。 | 依赖zk,可动态扩展节点,提供超高的吞吐量、极高的可用性以及可靠性 |
缺点 | Erlang语言难较大,不支持动态扩展;吞吐量会低一些,Rabbitmq的集群动态扩展很麻烦 | 有可能会丢失消息,该MQ不再维护,重心在下一代产品apolle | 社区活跃一般,随时会被阿里抛弃 | 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移 |
时效 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
可用性 | 高,基于主从架构实现高可用 | 高,基于主从架构实现高可用 | 非常高,分布式架构 | 非常高,分布式架构 |
可靠性 | 基本不丢数据 | 有较低的概率丢失数据 | 通过参数优化,可以做到0丢失 | 通过参数优化,可以做到0丢失 |
应用 | 适合对稳定性要求高的企业级应用 | 适合中小企业,不适合上千个队列的应用 | 适合大型企业和大规模分布式系统应用 | 应用在大数据日志处理或对实时性、可靠性要求较低的应用场景(少量数据丢失) |
1.5 补充:QPS,TPS,PPV,UV,PR
[1]、QPS(Queries Per Second):即每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。每秒查询率:在因特网上,经常用每秒查询率来衡量域名系统服务器的机器的性能,即为QPS。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。
[2]、TPS(Transactions Per Second):每秒事务数,每秒系统能够处理的事务次数。
[3]、PV(page view):即页面浏览量,或点击量;通常是衡量一个网络新闻频道或网站甚至一条网络新闻的主要指标。对PV的解释是,一个访问者在24小时(0点到24点)内到底看了你网站几个页面。这里需要强调:同一个人浏览你网站同一个页面,不重复计算PV量,点100次也算1次。说白了,PV就是一个访问者打开了你的几个页面。PV之于网站,就像收视率之于电视,从某种程度上已成为投资者衡量商业网站表现的最重要尺度。
PV的计算:当一个访问着访问的时候,记录他所访问的页面和对应的IP,然后确定这个IP今天访问了这个页面没有。如果你的网站到了23点,单纯IP有60万条的话,每个访问者平均访问了3个页面,那么pv表的记录就要有180万条。
[4]、UV(Unique Visitor):指访问某个站点或点击某条新闻的不同IP地址的人数。在同一天内,uv只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。独立IP访问者提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
[5]、PR值:即PageRank,网页的级别技术,用来标识网页的等级/重要性。级别从1到10级,10级为满分。PR值越高说明该网页越受欢迎(越重要)。例如:一个PR值为1的网站表明这个网站不太具有流行度,而PR值为7到10则表明这个网站非常受欢迎(或者说极其重要)。
[6]、计算关系:
- QPS = 并发量 / 平均响应时间
- 并发量 = QPS * 平均响应时间
- 原理:每天80%的访问集中在20%的时间里,这20%时间叫做峰值时间。
- 公式:( 总PV数 * 80% ) / ( 每天秒数 * 20% ) = 峰值时间每秒请求数(QPS) 。
- 机器数量:峰值时间每秒QPS / 单台机器的QPS = 需要的机器 。
参考资料:
2、RabbitMQ的简单介绍
2.1 RabbitMQ的简介
RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它的并发能力强,性能极好,延迟极低,稳定性和安全性很高,同时还支持集群。RabbitMQ在分布式系统开发中应用非常广泛,是最受欢迎的开源消息中间件之一。
注意:由于RabbitMQ是采用erlang语言开发的,所以必须有erlang环境才可以运行
AMQP 协议:AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议),提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
JMS:即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
2.2 RabbitMQ的特点
RabbitMQ最初起源于金融系统,用在分布式系统中存储转发消息,在易用性、扩展性、高可用等方面表现不俗,具体特点包括:
- 可靠性:使用一些机制保证可靠性,如持久化、传输确认、发布确认。
- 灵活的路由:在消息进入队列之前,通过Exchange交换机来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起或开发自己的Exchange。
- 消息集群:多个RabbitMQ组成一个集群,形成一个逻辑的broker。
- 高可用:队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列下仍然可用。
- 多种协议:支持多种消息队列协议,比如STOMP、MQTT等。
- 多语言客户端:支持几乎所有常用语言,比如java、rubu、.NET等。
- 管理界面:提供了一个易用的用户界面,使得用户可以监控和管理消息的broker
- 跟踪机制:如果消息异常,rabbitMQ提供了消息跟踪机制,使用者可以找出发生了什么。
2.3 RabbitMQ的基础架构
RabbitMQ是AMQP协议的一个开源实现,所以其内部实际上也是AMQP中的基本概念,如下图所示:
- Publisher:消息生产者,就是投递消息的程序,也是一个向交换机发布消息的客户端应用程序。生产者发送的消息一般包含两个部分:消息体和内容标签。
- Consumer:消息消费者,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
- Connection :网络连接,比如一个TCP连接,用于连接到具体Broker。
- Channel: 信道,AMQP 命令都是在信道中进行的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接,一个TCP连接可以用多个信道。客户端可以建立多个channel,每个channel表示一个会话任务。
- Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务器。
- Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
- Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
- Message:消息,由消息体和标签组成。消息体是不透明的,而标签则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键(Binding Key)联合使用才能最终生效。
- Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过BindingKey,交换器就知道将消息路由给哪个队列了。
- Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段。
2.4 Exchange交换器的类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers,由于headers交换器和direct交换器完全一致,且性能差很多,目前几乎用不到。这里只看direct、fanout、topic这三种类型:
- direct(直连):消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
- fanout(广播):把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
- topic(主题):通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。匹配规则:
- ① RoutingKey 和 BindingKey 为一个 点号 '.' 分隔的字符串。 比如: stock.usd.nyse;可以放任意的key在routing_key中,当然最长不能超过255 bytes。
- ② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配0个或者多个单词;
- headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
2.5 RabbitMQ的几种模式
RabbitMQ提供了6种模式:
- 简单模式:一对一模式,只有一个生产者,一个队列,一个消费者,是最简单的模式。
- 工作队列模式:一对多模式,一个消息生产者,一个消息队列,多个消费者。
- Publish/Subscribe发布与订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。
- Routing路由模式:在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
- Topics主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活(也是最常用的一种)。
- RPC远程调用模式(远程调用,不太算MQ;不作介绍)
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
3、RabbitMQ的安装-Linux(CentOS7)
3.1 RabbitMQ所需环境
- JDK1.8
- CentOS7-64位
- Erlang-OTP erlang官网:https://www.erlang.org/
- RabbitMQ rabbitmq官网:https://www.rabbitmq.com/
3.2 MQ与erlang的版本
rabbitmq和erlang的版本对应关系(注意:它两版本关系必须对应):https://www.rabbitmq.com/which-erlang.html
3.3 安装erlang环境
由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。
这里使用包云进在线下载:https://packagecloud.io/rabbitmq
然后选择需要下载的版本,erlang的版本一定要与RabbitMQ的版本相对应才行。
进入之后分别复制最右边的两个命令执行。
①、首先执行安装脚本(注:每次下载前都需执行此脚本,可能有点慢,耐心等待)
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
②、然后下载安装erlang
#如果wget命令未找到,则执行下面的命令,有则忽略本命令
yum -y install wget
##注:下面的操作二选一
#[1]、下载并自动安装erlang
sudo yum install erlang-23.3.4.4-1.el7.x86_64 -y
#[2]、下载并手动安装erlang
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.4-1.el7.x86_64.rpm/download.rpm
rpm -ivh erlang-23.3.4.4-1.el7.x86_64.rpm
③、检查erlang的版本号
erl -version
3.4 安装RabbitMQ
RabbitMQ的安装操作和erlang几乎一致。
①、执行安装脚本(注:每次下载前都需执行此脚本,可能有点慢,耐心等待)
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
②、下载并安装rabbitmq
##注:下面的操作二选一
#[1]、下载并自动安装rabbitmq
sudo yum install rabbitmq-server-3.8.17-1.el7.noarch -y
#[2]、下载并手动安装rabbitmq
wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.17-1.el7.noarch.rpm/download.rpm
rpm -ivh rabbitmq-server-3.8.17-1.el7.noarch.rpm
③、查看下载的安装包
rpm -qa | grep erlang
rpm -qa | grep rabbitmq-server
④、启用管理平台插件,启用插件后,可以可视化管理RabbitMQ
rabbitmq-plugins enable rabbitmq_management
⑤、RabbitMQ的相关命令
#启动
systemctl start rabbitmq-server
#重启
systemctl restart rabbitmq-server
#状态
systemctl status rabbitmq-server
#停止
systemctl stop rabbitmq-server
#开机自启
systemctl enable rabbitmq-server
3.5 开放相关端口
#查看已经开放的端口
firewall-cmd --list-ports
#开放指定端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
#重启防火墙
firewall-cmd --reload
3.6 添加登录用户
注:由于使用默认的用户名和密码guest需要修改配置文件,这里就不去改了,所以直接添加一个自定义登录用户,步骤如下。
①、添加一个用户名为admin,密码123456的用户
rabbitmqctl add_user admin 123456
②、设置admin为超级管理员
rabbitmqctl set_user_tags admin administrator
③、授权远程访问(也可以登录后,可视化配置)
rabbitmqctl set_permissions -p / admin "." "." ".*"
④、创建完成后,重启RabbitMQ
systemctl restart rabbitmq-server
⑤、查看当前可登录用户
rabbitmqctl list_users
⑥、删除相关用户
rabbitmqctl delete_user admin
3.7 卸载RabbitMQ卸载
#关闭rabbitmq
systemctl stop rabbitmq-server
#查看相关进程
ps aux | grep rabbitmq
#查看下载的安装包
rpm -qa | grep erlang
rpm -qa | grep rabbitmq-server
#卸载MQ(下面二选一)
#[1]推荐
rpm -qa | grep rabbitmq-server
rpm -evh rabbitmq-server-3.8.17-1.el7.noarch --nodeps
#[2]
yum list|grep rabbitmq
yum -y remove rabbitmq-server.noarch
#卸载erlang(下面二选一)
#[1]推荐
rpm -qa | grep erlang
rpm -evh erlang-23.3.4.4-1.el7.x86_64 --nodeps
#[2]
yum list | grep erlang
yum -y remove erlang.x86_64
#删除相关文件
rm -rf /usr/lib64/erlang
rm -rf /var/lib/rabbitmq
rm -rf /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.17/
rm -rf /etc/rabbitmq/
rm -rf /var/log/rabbitmq
3.8 控制台管理界面
打开浏览器输入网址:http://192.168.43.128:15672/
输入用户名(admin)和密码(123456),进入后台管理页面:
注:RabbitMQ的Web页面多点点就熟悉了。
- Overview:可以看到RabbitMQ大概的信息。
- Connections:在这里可以查看连接情况,无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。
- Channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
- Exchanges:交换机,用来实现消息的路由。
- Queues:队列,即消息队列,消息会存放在队列中,等待消费,消费后从队列中移除。
- Admin:对用户、虚拟主机、集群等的一些操作。
相关端口介绍:
4、RabbitMQ常用的五种模式介绍
4.1 简单模式
简单模式:该模式是个一对一模式,只有一个生产者(用于生产消息),一个队列 Queue(用于存储消息),一个消费者 C (用于接收消息)。
注:简单模式也用到了交换机,使用的是默认的交换机(AMQP default)。
4.1.1 代码实现
[1] 创建一个Maven项目
- RabbitMQ:父工程
- rabbitmq-commons:存放工具类
- rabbitmq-consumer:消费者模块
- rabbitmq-producer:生产者模块
[2] 导入依赖
在父工程中的 pom.xml 文件导入如下依赖:
<!-- mq的依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
<!-- 日志处理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
[3] 封装工具类
在rabbitmq-commons模块中封装 rabbitmq 连接的工具类(注意:其它模块要使用工具类只需引入本模块即可!)
/**
* 封装连接工具类
*/
public class ConnectionUtils {
public static Connection getConnection() throws Exception {
// 1.定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置服务器地址
factory.setHost("192.168.43.128");
// 3.设置协议端口号
factory.setPort(5672);
// 4.虚拟主机名称;默认为 /
factory.setVirtualHost("/");
// 5.设置用户名称
factory.setUsername("admin");
// 6.设置用户密码
factory.setPassword("123456");
// 7.创建连接
Connection connection = factory.newConnection();
return connection;
}
}
[4] 创建生产者
生产者负责创建消息并且将消息发送至指定的队列中,简单分为5步:
- 创建连接
- 创建通道
- 创建(声明)队列
- 发送消息
- 关闭资源
/**
* 生产者(简单模式)
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明(创建)队列
/*
* queue 参数1:声明通道中对应的队列名称
* durable 参数2:是否定义持久化队列,当mq重启之后队列还在
* exclusive 参数3:是否独占本次连接,为true则只能有一个消费者监听这个队列
* autoDelete 参数4:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
* arguments 参数5:队列其它参数(额外配置)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4.发送消息
/*
* exchange 参数1:交换机名称,如果没有指定则使用默认Default Exchange
* routingKey 参数2:队列名称或者routingKey,如果指定了交换机就是routingKey路由key,简单模式可以传递队列名称
* props 参数3:消息的配置信息
* body 参数4:要发送的消息内容
*/
String msg = "Hello RabbitMQ!!!";
System.out.println("生产者发送的消息:" + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
[5] 创建消费者
消费者实现和生产者实现过程差不多,但是没有关闭通道和连接,因为消费者要一直等待随时可能发来的消息,大致分为如下3步:
- 获取连接
- 创建通道
- 监听队列,接收消息
/**
* 消费者(简单模式)
*/
public class Consumer {
// 队列名称
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3. 创建队列Queue,如果没有一个名字叫simple_world的队列,则会创建该队列,如果有则不会创建.
// 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/*
* handleDelivery回调方法,当收到消息后,会自动执行该方法
* consumerTag 参数1:消费者标识
* envelope 参数2:可以获取一些信息,如交换机,路由key...
* properties 参数3:配置信息
* body 参数4:读取到的消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取消息:" + new String(body));
}
};
/*
* queue 参数1:队列名称
* autoAck 参数2:是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
* callback 参数3:回调对象,在上面定义了
*/
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
[6] 运行结果
把生产者的代码运行三次,表示向队列中发送了三次消息。
查看RabbitMQ控制台中的内容。
最后启动消费者,查看控制台打印的数据。
简单模式的不足之处:这种模式是一对一,一个生产者向一个队列中发送消息,一个消费者从绑定的队列中获取消息,这样耦合性过高,如果有多个消费者想消费队列中信息就无法实现了。
4.2 工作模式
工作模式:也被称为任务模型(Task Queues)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行。
这种模式只有一个生产者 P,一个用于存储消息的队列 Queue、多个消费者 C 用于接收消息。
工作队列模式的特点有三:
- 一个生产者,一个队列,多个消费者同时竞争消息
- 任务量过高时可以提高工作效率
- 消费者获得的消息是无序的
4.2.1 代码实现
[1] 创建生产者
向队列中发送10条消息。
/**
* 生产者(工作模式)
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、声明队列 queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加属性参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4、发送10条消息
for (int i = 1; i <= 10; i++) {
String msg = "Hello RabbitMQ!!!~~~" + i;
System.out.println("生产者发送消息:" + msg);
// basicPublish(交换机名称-""表示不用交换机,队列名称或者routingKey, 消息的属性信息, 消息内容的字节数组);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
//释放资源
channel.close();
connection.close();
}
}
[2] 创建消费者
下面分别创建两个消费者Consumer1和Consumer2。
消费者Consumer1:
/**
* 消费者1(工作模式)
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建.
// 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取消息:" + new String(body));
// 模拟消息处理延时,加个线程睡眠时间
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// basicConsume(队列名称, 是否自动确认, 回调对象)
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
消费者Consumer2:和消费者1几乎一模一样。
/**
* 消费者2(工作模式)
*/
public class Consumer2 {
// 队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建.
// 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取消息:" + new String(body));
// 模拟消息处理延时,加个线程睡眠时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// basicConsume(队列名称, 是否自动确认, 回调对象)
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
[3] 运行结果
首先分别启动两个消费者(注意这里一定要先启动消费者)。
然后启动生产者,分别查看消费者控制台的打印信息,如下所示。
从结果来看,两个消费者对应的控制台是否竞争性的接收到消息。
4.2.2 轮询分发(round-robin)
上面的代码实现就是轮询分发的方式。现象:消费者1 处理完消息之后,消费者2 才能处理,它两这样轮着来处理消息,直到消息处理完成,这种方式叫轮询分发(round-robin)
,结果就是不管两个消费者谁忙,「数据总是你一个我一个」,不管消费者处理数据的性能,此时 autoAck = true。
注意:autoAck属性设置为true,表示消息自动确认。消费者在消费时消息的确认模式可以分为『自动确认和手动确认』。
- 自动确认:在队列中的消息被消费者读取之后会自动从队列中删除。不管消息是否被消费者消费成功,消息都会删除。
- 手动确认:当消费者读取消息后,消费端需要手动发送ACK用于确认消息已经消费成功了(也就是需要自己编写代码发送ACK确认),如果设为手动确认而没有发送ACK确认,那么消息就会一直存在队列中(前提是进行了持久化操作),后续就可能会造成消息重复消费,如果过多的消息堆积在队列中,还可能造成内存溢出,『手动确认消费者在处理完消息之后要及时发送ACK确认给队列』。
使用轮询分发的方式会有一个明显的缺点,例如消费者1 处理数据的效率很慢,消费者2 处理数据的效率很高,正常情况下消费者2处理的数据应该多一点才对,而轮询分发则不管你的性能如何,反正就是每次处理一个消息,对于这种情况可以使用公平分发的方式来解决。
4.2.3 公平分发(fair dipatch)
要实现公平分发
,操作分为两个步骤:
【1】、保证消息一次只分发一次,加一段关键性代码:
【2】、关闭自动确认,并且手动发送ACK给队列:
完整代码如下所示(分别修改两个消费者):
/**
* 消费者1(工作模式)
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建.
// 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
channel.basicQos(1);
// 4、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取消息:" + new String(body));
// 模拟消息处理延时,加个线程睡眠时间
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// basicConsume(队列名称, 是否自动确认, 回调对象)
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
修改完成之后再次运行,由于消费者1 设置处理完一个消息后睡眠2秒,而消费者2 为1 秒,所以预计输出的结果为: 消费者2 处理的消息大概是消费者1 的两倍左右,结果如下图所示。
4.3 发布订阅模式
发布订阅模式(Publish/Subscribe):这种模式需要涉及到交换机了,也可以称它为广播模式,消息通过交换机广播到所有与其绑定的队列中。
详细介绍:一个消费者将消息首先发送到交换机上(这里的交换机类型为fanout),然后交换机绑定到多个队列,这样每个发到fanout类型交换器的消息会被分发到所有的队列中,最后被监听该队列的消费者所接收并消费。如下图所示:
4.3.1 代码实现
[1] 创建生产者
/**
* 生产者(发布订阅模式)
*/
public class Producer {
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、连续发送10条消息
for (int i = 1; i <= 10; i++) {
String msg = "Hello RabbitMQ!!!~~~" + i;
System.out.println("生产者发送的消息:" + msg);
//basicPublish(交换机名称[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,发送的消息内容)
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
}
//关闭资源
channel.close();
connection.close();
}
}
[2] 创建消费者
由于从这里开始涉及到交换机了,使用这里介绍一下四种交换机的类型:
- direct(直连):消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
- fanout(广播):把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
- topic(主题):通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。匹配规则:
- ① RoutingKey 和 BindingKey 为一个 点号 '.' 分隔的字符串。 比如: stock.usd.nyse;可以放任意的key在routing_key中,当然最长不能超过255 bytes。
- ② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配0个或者多个单词;
- headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
消费者1:
注意:在发送消息前,RabbitMQ服务器中必须的有队列,否则消息可能会丢失,如果还涉及到交换机与队列绑定,那么就得先声明交换机、队列并且设置绑定的路由值(Routing Key),以免程序出现异常,由于本例所有的声明都是在消费者中,所以我们首先要启动消费者。如果RabbitMQ服务器中已经存在了声明的队列或者交换机,那么就不在创建,如果没有则创建相应名称的队列或者交换机。
/**
* 消费者1(发布订阅模式)
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME1 = "fanout_queue1";
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
/* 3、声明交换机
* exchange 参数1:交换机名称
* type 参数2:交换机类型
* durable 参数3:交换机是否持久化
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 4、声明队列Queue queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 5、绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型为fanout ,routingKey设置为""])
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("交换机名称:" + exchange + ",消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
消费者2:和消费者1几乎一模一样
/**
* 消费者2(发布订阅模式)
*/
public class Consumer2 {
// 队列名称
private static final String QUEUE_NAME2 = "fanout_queue2";
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明交换机,如果没有名称为EXCHANGE_NAME的交换机则创建,有则不创建
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 5、绑定队列和交换机。channel.queueBind(队列名, 交换机名, 路由key[fanout交换机的routingKey设置为""])
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("交换机名称:" + exchange + ",消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达『广播』的效果,如下所示。
在执行完测试代码后,可以到RabbitMQ的管理后台找到Exchanges
选项卡,点击说明的 fanout_exchange
交换机,可以查看到如下的绑定:
[4] 简单总结
发布订阅模式引入了交换机的概念,所以相对前面的类型更加灵活广泛一些。这种模式需要设置类型为fanout的交换机,并且将交换机和队列进行绑定,当消息发送到交换机后,交换机会将消息发送到所有绑定的队列,最后被监听该队列的消费者所接收并消费。发布订阅模式也可以叫广播模式,不需要RoutingKey的判断。
发布订阅模式与工作队列模式的区别:
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
4.4 路由模式(精确匹配)
路由模式(Routing)的特点:
- 该模式的交换机为direct,意思为定向发送,精准匹配。
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
RoutingKey
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息。
详细介绍:生产者将消息发送到direct交换器,同时生产者在发送消息的时候会指定一个路由key,而在绑定队列和交换器的时候又会指定一个路由key,那么消息只会发送到相应routing key相同的队列,然后由监听该队列的消费者进行消费消息。模型如下图所示:
4.4.1 代码实现
[1] 创建生产者
/**
* 生产者(路由模式)
*/
public class Producer {
// 交换机名称
private static final String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、发送消息,连续发3条
for (int i = 0; i < 3; i++) {
String routingKey = "";
//发送消息的时候根据相关逻辑指定相应的routing key。
switch (i) {
case 0: //假设i=0,为error消息
routingKey = "error";
break;
case 1: //假设i=1,为info消息
routingKey = "info";
break;
case 2: //假设i=2,为warning消息
routingKey = "warning";
break;
}
// 要发送的消息
String message = "Hello Message!!!~~~" + routingKey;
// 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
System.out.println("生产者发送的消息:" + message);
}
//释放资源
channel.close();
connection.close();
}
}
[2] 创建消费者
消费者1:
/**
* 消费者1(路由模式)
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME1 = "routing_queue1";
// 交换机名称
private static final String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "error");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取路由的key
String routingKey = envelope.getRoutingKey();
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
消费者2:
/**
* 消费者2(路由模式)
*/
public class Consumer2 {
// 队列名称
private static final String QUEUE_NAME2 = "routing_queue2";
// 交换机名称
private static final String EXCHANGE_NAME = "routing_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取路由的key
String routingKey = envelope.getRoutingKey();
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
}
}
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达『按照需要接收』的效果。
消费者1绑定的交换机和队列的路由Key为error,所以只要生产者发送消息时带有error的routingKey它都能够获取到消息。
消费者2绑定的交换机和队列的路由Key为error、info、warning,所以只要生产者发送消息时带有这3种的routingKey它都能够获取到消息。
[4] 简单总结
- Routing模式需要将交换机设置为Direct类型。
- Routing模式要求队列在绑定交换机时要指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
4.5 Topic模式(模糊匹配)
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。但是Topic类型的Exchange可以让队列在绑定Routing key 的时候使用通配符进行匹配,也就是模糊匹配,这样与之前的模式比起来,它更加的灵活!
Topic主题模式的Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: log.insert ,它的通配符规则如下:
- *:匹配不多不少恰好1个词
- #:匹配0或多个单词
简单举例:
log.*:只能匹配log.error,log.info 等
log.#:能够匹配log.insert,log.insert.abc,log.news.update.abc 等
图解:
- 红色Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配
4.5.代码实现
[1] 创建生产者
/**
* 生产者(Topic主题模式)
*/
public class Producer {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、发送消息
for (int i = 0; i < 4; i++) {
String routingKey = "";
//发送消息的时候根据相关逻辑指定相应的routing key。
switch (i) {
case 0: //假设i=0,为select消息
routingKey = "log.select";
break;
case 1: //假设i=1,为info消息
routingKey = "log.delete";
break;
case 2: //假设i=2,为log.news.add消息
routingKey = "log.news.add";
break;
case 3: //假设i=3,为log.news.update消息
routingKey = "log.news.update";
break;
}
// 要发送的消息
String message = "Hello Message!!!~~~" + routingKey;
// 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
System.out.println("生产者发送的消息:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}
[2] 创建消费者
消费者1:接收所有与log.*
相匹配的路由key队列中的消息
/**
* 消费者(Topic模式)
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME1 = "topic_queue1";
// 交换机名称
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 4、声明队列Queue channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
// 5、根据指定的routingKey绑定队列和交换机,设置路由key channel.queueBind(队列名, 交换机名, 路由key)
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "log.*");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取路由的key
String routingKey = envelope.getRoutingKey();
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);
//注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
}
}
消费者2:接收所有与log.#
相匹配的路由key队列中的消息
/**
* 消费者(Topic模式)
*/
public class Consumer2 {
// 队列名称
private static final String QUEUE_NAME2 = "topic_queue2";
// 交换机名称
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
// 1、获取连接对象
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道(频道)
Channel channel = connection.createChannel();
// 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
// 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "log.#");
// 6、监听队列,接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取路由的key
String routingKey = envelope.getRoutingKey();
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
}
};
channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
}
}
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达『按照需要接收』的效果。
消费者1的路由key匹配规则为log.*
,所有该路由规则的绑定的队列应该只有2条信息,结果如下所示:
消费者2的路由key匹配规则为log.#
,它能够匹配以log.
开头的所有路由key,所有该路由规则的绑定的队列应该只有4条信息,结果如下所示:
最后查看一下交换机与队列绑定的相关信息。
[4] 简单总结
Topic主题模式
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。Topic主题模式
可以实现Publish/Subscribe发布与订阅模式
和Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,所以显得更加灵活。
标签:String,队列,RabbitMQ,交换机,消息,channel,消费者 来源: https://www.cnblogs.com/angelzheng/p/16694713.html