02-RabbitMQ入门
作者:互联网
二、RabbitMQ入门
2.1、RabbitMQ安装
-
①、拉取镜像
docker pull rabbitmq:3.8-management
-
②、创建容器
-
docker run \ -e RABBITMQ_DEFAULT_USER=root \ -e RABBITMQ_DEFAULT_PASS=root \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
-
2.2、RabbitMQ角色模型
- RabbitMQ中的一些角色
- publisher
- 生产者
- consumer
- 消费者
- exchange
- 交换机,负责消息路由
- queue
- 队列,存储消息
- virtualHost
- 虚拟主机,隔离不同租户(可以理解为不同业务)的exchange、queue、消息的隔离
- publisher
- 如下图所示
2.3、RabbitMQ消息模型
- 官网地址
- RabbitMQ官方提供了5个不同的Demo实例,对应了不同的消息模型
- 基本消息队列(BasicQueue)
- 工作消息队列(WorkQueue)
- 发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
- Fanout Exchange:广播
- 基本消息队列(BasicQueue)
2.4、RabbitMQ入门案例
- 工程导入
- 项目比较简单,就是一个父工程加两个SpringBoot子工程,自行创建即可
- 简单队列模式的模型图
- 官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存模型
- consumer:订阅队列,处理队列中的消息
2.4.1、publisher实现
-
实现思路
- ①、建立连接
- ②、创建Channel
- ③、声明队列
- ④、发送消息
- ⑤、关闭连接和Channel
-
代码实现
-
@Test public void testSendMessage() throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1 设置连接参数,分别是:主机名称、端口号、virtualhost、用户名、密码 factory.setHost("192.168.222.135"); factory.setPort(5672); factory.setVirtualHost("/"); // 虚拟主机的地址(在mq的可视化界面中可以找到) factory.setUsername("root"); factory.setPassword("root"); // 1.2 建立连接 Connection connection = factory.newConnection(); // System.out.println("connection = " + connection); // 2. 建立通道 Channel channel = connection.createChannel(); // 3. 建立队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4. 发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5. 关闭通道和连接 channel.close(); connection.close(); }
-
-
Idea终端输出
-
查看rabiitmq可视化管理界面(宿主机IP:15672)
-
查看该队列中的message属性是否存储我们发送的值
2.4.2、consumer实现
-
实现思路
- ①、建立连接
- ②、创建Channel
- ③、声明队列
- ④、订阅消息
-
代码实现
-
@Test public void testGetSimpleQueueMessage() throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1 设置连接参数,分别是:主机名,端口号,virtualhost、用户名、密码 factory.setHost("192.168.222.135"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("root"); factory.setPassword("root"); // 1.2 建立连接 Connection connection = factory.newConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 声明队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4. 订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5. 处理消息 String message = new String(body); System.out.println("接收到消息:【"+ message + "】"); } }); System.out.println("等待接收消息......"); }
-
-
idea终端输出
- 程序运行第二次可以发现,已经取不到消息了,这是因为队列中的消息被consumer消费后,即会删除
2.5、小结
- 基本消息队列的消息发送流程
- ①、建立connection
- ②、创建channel
- ③、利用channel声明队列
- ④、利用channel向队列发送消息
- 基本消息队列的消息接收流程
- ①、建立connection
- ②、创建channel
- ③、利用channel声明队列
- ④、定义consumer的消费行为handleDelivery()
- ⑤、利用channel将消费者与队列绑定
标签:02,入门,队列,factory,RabbitMQ,connection,消息,channel 来源: https://www.cnblogs.com/OnlyOnYourself-lzw/p/16491776.html