rabbitmq(适合初学者)
作者:互联网
rabbitmq
安装rabbitmq
安装版本3.8.9
github 地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.9
erlang环境推荐下载地址:https://www.erlang-solutions.com/resources/download.html
![image.png](https://www.icode9.com/i/ll/?i=img_convert/e728c346b785f39b1b69ef8aad0b6ccf.png#align=left&display=inline&height=665&margin=[object Object]&name=image.png&originHeight=665&originWidth=725&size=57582&status=done&style=none&width=725)
因为rabbitmq依赖erlang语言版本很强,所以 erlang语言 23版本
#安装erlang和rabbitmq
#erlang语言安装
rpm -ivh esl-erlang_23.2.1-1_centos_7_amd64.rpm
#rabbitmq安装
rpm -ivh rabbitmq-server-3.8.9-1.el6.noarch.rpm
#因为erlang语言环境可能会依赖一些内部环境,所以需要配置安装
#启动rabbitmq : service rabbitmq-server start
#开启可视化界面 : rabbitmq-plugins enable rabbitmq_management
启动
在web浏览器中输入地址:http://127.0.0.1:15672/
输入默认账号: guest 密码: guest
解决客户端不能登录问题:(User can only log in via localhost) 针对最新版
1.环境变量可用于覆盖配置文件的位置:(官方推荐)
# overrides primary config file location
RABBITMQ_CONFIG_FILE=/path/to/a/custom/location/rabbitmq.conf
# overrides advanced config file location
RABBITMQ_ADVANCED_CONFIG_FILE=/path/to/a/custom/location/advanced.config
# overrides environment variable file location
RABBITMQ_CONF_ENV_FILE=/path/to/a/custom/location/rabbitmq-env.conf
rabbitmq.conf 样式地址:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example
1.在/etc/rabbitmq/目录下创建文件 rabbitmq.conf
2.编辑
## The default "guest" user is only permitted to access the server
## via a loopback interface (e.g. localhost).
## {loopback_users, [<<"guest">>]},
##
# loopback_users.guest = true
## Uncomment the following line if you want to allow access to the
## guest user from anywhere on the network.
loopback_users.guest = false
#重启即可访问
rabbitmq后台管理界面
1.添加虚拟主机
![image.png](https://www.icode9.com/i/ll/?i=img_convert/d37ef91ce7266662bbaabda8b0ea023a.png#align=left&display=inline&height=656&margin=[object Object]&name=image.png&originHeight=656&originWidth=1849&size=58308&status=done&style=shadow&width=1849)
2.添加账户
![image.png](https://www.icode9.com/i/ll/?i=img_convert/c762a16fab539acc46673268842829bf.png#align=left&display=inline&height=676&margin=[object Object]&name=image.png&originHeight=676&originWidth=1867&size=64716&status=done&style=shadow&width=1867)
账户设置虚拟主机
![image.png](https://www.icode9.com/i/ll/?i=img_convert/60fd75aae11a57699c82512ee9e94387.png#align=left&display=inline&height=762&margin=[object Object]&name=image.png&originHeight=762&originWidth=1878&size=62710&status=done&style=shadow&width=1878)
RabbitMQ模型
1. 直连
在本教程的这一部分中,我们将在 Java 中编写两个程序;发送单个消息的生产者,以及接收消息并打印出消息的使用者。我们将介绍Java API中的一些细节,专注于这个非常简单的事情,只是为了开始。这是一个"Hello World"的消息。
在下面的图中,"P"是我们的生产者,"C"是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。
![image.png](https://www.icode9.com/i/ll/?i=img_convert/c37347c3f6e702f305c8504f63f59aed.png#align=left&display=inline&height=122&margin=[object Object]&name=image.png&originHeight=122&originWidth=355&size=5392&status=done&style=none&width=355)
(1) 整合spring依赖
<!--rabbitmq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
生产者
/**
* 生产者
* @throws IOException
* @throws TimeoutException
*/
@Test
void provider() throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
/*
* 参数1:队列名称, 如果队列不存在就自动创建
* 参数2:用来定义队列特性是否持久化 true 持久队列,false 不持久队列
* 参数3:exclusive 是否独占队列 true独占队列 false 不独占队列
* 参数4:autoDelete 是否在消费完成后自动删除队列,true 自动删除 false 不自动删除
* 参数5:arguments 附加参数
* */
channel.queueDeclare("hello", false, false, false, null);
//发布消息
/*
* 参数1: 交换机名称
* 参数2: 队列名称
* 参数3: 传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
* 参数4: 消息具体内容
* */
channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
channel.close();
connection.close();
}
消费者
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//创建连接对象
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象
channel.queueDeclare("hello", false, false, false, null);
//消费消息
/*
* 参数1: 消费那个对象的消息
* 参数2: 开始消息的自动确认机制
* 参数3: 消费时回调接口
* */
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
/**
*
* @param consumerTag
* @param envelope
* @param properties
* @param body 消息队列中取出的消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息: " + new String(body));
}
});
}
2. Work queues工作队列 (task queues)
**
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又名:任务队列)的主要思想是避免立即执行资源密集型任务,并且必须等待任务完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行该作业。当您运行多个worker时,任务将在他们之间共享。
这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口期间无法处理复杂的任务。
![image.png](https://www.icode9.com/i/ll/?i=img_convert/f808c8a00a9a5529889ed6feb70c8c3b.png#align=left&display=inline&height=127&margin=[object Object]&name=image.png&originHeight=127&originWidth=385&size=7871&status=done&style=none&width=385)p:生产者 中间:消息队列 C:消费者
生产者
package com.nf.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "work", null, ("hello World ------------" + i).getBytes());
}
//关闭流
channel.close();
//关闭连接
connection.close();
}
}
消费者1
package com.nf.rabbitmq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 第一个消费者
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者2
package com.nf.rabbitmq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 第二个消费者
*/
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" + new String(body));
}
});
}
}
演示结果:
消费者1:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/18f0f3fb4bdd75d756a5843e7bde48e4.png#align=left&display=inline&height=175&margin=[object Object]&name=image.png&originHeight=175&originWidth=498&size=38730&status=done&style=shadow&width=498)
消费者2:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/34feb57b996309720b379bbf7315b3d3.png#align=left&display=inline&height=183&margin=[object Object]&name=image.png&originHeight=183&originWidth=504&size=39270&status=done&style=shadow&width=504)
注意:消息队列中默认情况是平均分配:即如上图所示,生产者将消息推送到队列中后,消费者1和消费者2平均消费,且消息自动确认为true(消费者自动向消息队列自动确认了消息)
让消息队列“能者多劳”:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/6a096ad6ca22c08be3d977cea8e75655.png#align=left&display=inline&height=345&margin=[object Object]&name=image.png&originHeight=345&originWidth=931&size=90337&status=done&style=shadow&width=931)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/a156de464cd65aa9387ba2f09666b12b.png#align=left&display=inline&height=438&margin=[object Object]&name=image.png&originHeight=438&originWidth=931&size=91595&status=done&style=shadow&width=931)
结果:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/d335f210bfd668e40df53e89f89d6b8e.png#align=left&display=inline&height=345&margin=[object Object]&name=image.png&originHeight=345&originWidth=780&size=85759&status=done&style=shadow&width=780)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/a26180f6cf9e0dd22efbc9d77e7ab30b.png#align=left&display=inline&height=216&margin=[object Object]&name=image.png&originHeight=216&originWidth=845&size=45735&status=done&style=shadow&width=845)
Publish/Subscribe (发布订阅模式、广播)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/a3afa9bb43b28cd5f69cb3958426f374.png#align=left&display=inline&height=128&margin=[object Object]&name=image.png&originHeight=128&originWidth=375&size=6420&status=done&style=shadow&width=375)
在广播模式下,消息发送流程:
- 可以有多个消费者
- 每个消费者拥有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定发送给那个队列,生产者无法决定
- 交换机把消息发动给绑定过得所有队列
- 队列的消费者都能拿到消息,实现一条消息被多个消费者消费
生产者把消息发送给交换机 -> 交换机把消息发送给队列
生产者:
package com.nf.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
/*
* 参数1: 交换机的名称
* 参数2: 交换机类型 ,广播类型:fanout
*
* */
channel.exchangeDeclare("log", "fanout");
/*
* 参数1:指定的交换机
* 参数2:
* 参数3:
*
*
* */
channel.basicPublish("logs", "", null, "我是生产者 ,我在发消息".getBytes());
//关闭流
channel.close();
//关闭连接
connection.close();
}
}
消费者1
package com.nf.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者1
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
//通到绑定交换机
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者2
package com.nf.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者1
*/
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.128.144");
//设置端口号
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/test");
//设置用户名和密码
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
//通到绑定交换机
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" + new String(body));
}
});
}
}
结果
![image.png](https://www.icode9.com/i/ll/?i=img_convert/200807b06021f4ee2bc1c8c114fe1d17.png#align=left&display=inline&height=82&margin=[object Object]&name=image.png&originHeight=82&originWidth=329&size=17026&status=done&style=none&width=329)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/7340b839b9b08bc8863c4546829035e3.png#align=left&display=inline&height=111&margin=[object Object]&name=image.png&originHeight=111&originWidth=349&size=17678&status=done&style=none&width=349)
Routing(路由选择)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/448fd82db13c67a6c8dffd696c0503f5.png#align=left&display=inline&height=165&margin=[object Object]&name=image.png&originHeight=165&originWidth=458&size=12753&status=done&style=none&width=458)
在上一个教程中,我们构建了一个简单的日志记录系统。我们可以向许多接收器广播日志信息。
在本教程中,我们将向它添加一个特性-我们将使它能够只订阅消息的一个子集。例如,我们将只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
生产者(产生日志)生产不同的日志,如error,info,debug,warn,fatal,让不同消费者来消费处理这些消息
生产者:
package com.nf.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者,只消费error
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/test");
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("logs_direct", "direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于route key绑定队列和交换机
channel.queueBind(queueName, "logs_direct", "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者1:
package com.nf.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者,只消费error
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/test");
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("logs_direct", "direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于route key绑定队列和交换机
channel.queueBind(queueName, "logs_direct", "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者2
package com.nf.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者,只消费info error
*/
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/test");
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("logs_direct", "direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于route key绑定队列和交换机
channel.queueBind(queueName, "logs_direct", "error");
channel.queueBind(queueName, "logs_direct", "info");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" + new String(body));
}
});
}
}
结果 :
消费者1
![image.png](https://www.icode9.com/i/ll/?i=img_convert/3574610c65e5faca5cbccac31a4b819e.png#align=left&display=inline&height=108&margin=[object Object]&name=image.png&originHeight=108&originWidth=495&size=23448&status=done&style=none&width=495)
消费者2:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/3e3d6f1e1821df87375a94eac2b26ba6.png#align=left&display=inline&height=130&margin=[object Object]&name=image.png&originHeight=130&originWidth=488&size=30678&status=done&style=none&width=488)
Topics
topic类型的Exchange与Direct相比,都可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如: item.insert
![image.png](https://www.icode9.com/i/ll/?i=img_convert/f2850d0d4ec7074f868a5b2d296e5694.png#align=left&display=inline&height=166&margin=[object Object]&name=image.png&originHeight=166&originWidth=475&size=13171&status=done&style=none&width=475)
![image.png](https://www.icode9.com/i/ll/?i=img_convert/a0fc1590205649478d5b05b9dc08be45.png#align=left&display=inline&height=36&margin=[object Object]&name=image.png&originHeight=36&originWidth=665&size=11977&status=done&style=none&width=665)
#:占位多个
*:只能占位一个
生产者:
package com.nf.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/test");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
/*
* 参数1:交换机名称 参数2:类型
*
* */
channel.exchangeDeclare("topics", "topic");
/*
* 参数1:交换机名称
* 参数2:路由key
* 参数3:保留
* 参数4:发送的消息
*
* */
channel.basicPublish("topics", "user.save", null, ("我是生产者,我在生产消息: user.save").getBytes());
channel.basicPublish("topics", "user.save.delete", null, ("我是生产者,我在生产消息: user.save.delete").getBytes());
channel.basicPublish("topics", "person.user.save", null, ("我是生产者,我在生产消息: person.user.save").getBytes());
channel.basicPublish("topics", "user", null, ("我是生产者,我在生产消息: user").getBytes());
channel.close();
connection.close();
}
}
消费者1:
package com.nf.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者,只消费info error
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/test");
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("topics", "topic");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,动态通配符形式route key
channel.queueBind(queueName, "topics", "user.*");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者2:
package com.nf.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者,只消费info error
*/
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/test");
connectionFactory.setHost("192.168.128.144");
connectionFactory.setPort(5672);
connectionFactory.setUsername("nf");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("topics", "topic");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,动态通配符形式route key
channel.queueBind(queueName, "topics", "user.#");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:" + new String(body));
}
});
}
}
结果:
消费者1
![image.png](https://www.icode9.com/i/ll/?i=img_convert/7fe0f3709a61a40832de9bad92d9ab01.png#align=left&display=inline&height=104&margin=[object Object]&name=image.png&originHeight=104&originWidth=424&size=14975&status=done&style=none&width=424)
消费者2
![image.png](https://www.icode9.com/i/ll/?i=img_convert/bf986edaae6e9f6be32a9826978b2e5e.png#align=left&display=inline&height=138&margin=[object Object]&name=image.png&originHeight=138&originWidth=526&size=33873&status=done&style=none&width=526)
rabbit整合springboot
依赖
<!--rabbitmq 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring:
rabbitmq:
host: 192.168.16.132
port: 5672
username: nf
password: 123456
virtual-host: /test
直连
package com.nf.rabbitmq.hello;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component //默认 持久化 非独占 不是自动删除
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
public class HelloCustomer {
@RabbitHandler
public void test(String message) {
System.out.println(message);
}
}
word queues 工作队列
package com.nf.rabbitmq.hello;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void test1(String message) {
System.out.println("消费者1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void test2(String message) {
System.out.println("消费者2:" + message);
}
}
Publish/Subscribe (发布订阅模式、广播)**
package com.nf.rabbitmq.hello;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 广播 fanout
* Publish/Subscribe
*/
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建临时队列
exchange = @Exchange( //绑定交换机
value = "logs",
type = "fanout"
))
})
public void test1(String message) {
System.out.println("消费者1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(
value = "logs",
type = ExchangeTypes.FANOUT
))
})
public void test2(String message) {
System.out.println("消费者2:" + message);
}
}
Routing(路由选择)
package com.nf.rabbitmq.hello;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class RouteCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),//自定义交换名称和类型
key = {
"info", "error", "warn"
}
)
})
public void test1(String message) {
System.out.println("消费者1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),//自定义交换名称和类型
key = {
"info",
}
)
})
public void test2(String message) {
System.out.println("消费者2:" + message);
}
}
Topics
**
package com.nf.rabbitmq.hello;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(
name = "topics",
type = ExchangeTypes.TOPIC),
key = {"user.save", "user.*"})
})
public void test1(String message) {
System.out.println("消费者1: " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(
name = "topics",
type = ExchangeTypes.TOPIC),
key = {"user.#"})
})
public void test2(String message) {
System.out.println("消费者2: " + message);
}
}
测试
package com.nf.rabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = RabbitmqApplication.class)
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void hello() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
/**
* 工作队列
*/
@Test
void work() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "hello world + :" + i);
}
}
/**
* 广播 fanout
* Publish/Subscribe
*/
@Test
public void fanout() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("logs", "", "fanout");
}
}
/**
* 路由模式
*/
@Test
public void routeDirects() {
rabbitTemplate.convertAndSend("directs", "info", "发送的 info信息");
rabbitTemplate.convertAndSend("directs", "error", "发送的 error信息");
}
/**
* 动态路由
*/
@Test
public void topicRoute() {
rabbitTemplate.convertAndSend("topics", "user.save", "发送的 user.save 信息");
rabbitTemplate.convertAndSend("topics", "user.save.delete", "发送的 user.save.delete 信息");
rabbitTemplate.convertAndSend("topics", "user.delete", "发送的 user.delete 信息");
}
}
总结
rabbitmq一些配置问题
rabbitmq除了可以手动创建队列,交换机,还有两种创建方式,一种是配置类的方式,另一种是注解的方式
1.配置类方式
package com.nf.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
*/
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明交换机
@Bean("itemTopicExchange")
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
//声明队列
@Bean("itemQueue")
public Queue itemQueue() {
return QueueBuilder.durable(ITEM_QUEUE ).build();
}
//绑定队列和交换机
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("item1.#").noargs();
}
//声明队列
@Bean("itemQueue2")
public Queue itemQueue2() {
return QueueBuilder.durable(ITEM_QUEUE+ 2).build();
}
/*
* 绑定队列和交换机,"BindingBuilder.bind(queue).to(exchange).with("item2.#").noargs()"含义是将queue绑定到exchange交换机上 ,并且路由的方式为with("item2.#")
*/
@Bean
public Binding itemQueueExchange2(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("item2.#").noargs();
}
}
2.注解方式
package com.nf.rabbitmq.config;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者监听类
*/
@Component
public class MyListner {
// @RabbitListener(queues = "item_queue")
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue("item_queue"),
exchange = @Exchange(value = "item_topic_exchange", type = ExchangeTypes.TOPIC),key = {"item1.#"})
})
public void msg(String msg) {
System.out.println("消费者消费消息了:" + msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("item_queue2"),
exchange = @Exchange(value = "item_topic_exchange", type = ExchangeTypes.TOPIC),
key = {"item2.#"}
)}
)
public void msg2(String msg) {
System.out.println("消费者消费消息了2:" + msg);
}
}
标签:connectionFactory,png,适合,rabbitmq,初学者,import,public,channel 来源: https://blog.csdn.net/qq_41564402/article/details/113430380