RabbitMq——核心API即概念的详细介绍与使用
作者:互联网
Exchange:接收生产者的消息,并根据路由键转发消息到所绑定的队列。
一、Exchange属性
Name:交换机名称
Type:交换机类型 direct、topic、fanout、headers
Durability:是否持久化,true为持久化。如果是false,那么在RabbitMq服务重启之后,交换机就会没有了。
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除改Exchange。即当Exchange没有队列的时候自动删除。实际生产环境中是不会设置自动删除的。
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false。
Arguments:扩展参数,用于扩展AMQP协议自制定化使用。
Exchange和队列是多对多的关系,即一个exchange可以发送消息到多个队列,一个队列也可以接受多个exchange发送的消息,推荐在实际生产环境中,一个Exchange对应多个队列,尽量不使用多个exchange对一个队列。
在实际的生产时,不推荐在代码中声明exchange、queue以及exchange和queue的绑定关系,最好是在控制台就做好这些声明,在代码中直接使用。如果非要在代码中声明,一定要在生产者端和消费者端同时声明,以免启动时报错(找不到exchange和queue而报错)。
一个exchange和一个queue可以存在多个绑定关系,比如exchangeA和queueA可以同时有两个绑定关系:“log.info.*” 和“log.warn.*”
二、Direct模式
生产者只关注发送的Exchange和RoutingKey
direct模式的routingKey不允许使用通配符*,必须完全匹配
示例:
生产者:
package com.bxp.rabbitmqapi.DirectExchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Sender {
// RabbitMq 生产者入门示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、创建通信通道
Channel channel = connection.createChannel();
// 4、创建队列,如果队列已经存在,则不会执行任何操作,如果队列不存在,会执行创建队列的操作
//String queueName = "test-001";
// 参数:队列名称、是否持久化、是否独占(队列仅供此连接使用,即只能有一个消费者)、不使用时是否自动删除、其他参数
//channel.queueDeclare("test-001", false, false, false , null);
// 5、创建消息的属性
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
// 生产消息
String exchangeName = "test-direct-exchange";
String routeKey = "test-direct-routeKey";
String messagePrefix = "RabbitMq Message test00";
for(int i = 0; i < 5; i++){
String message = messagePrefix + i;
// 生产者端在发送消息时,只需要关心exchange和routeKey即可
channel.basicPublish(exchangeName, routeKey, properties, message.getBytes());
}
}
}
消费者:
package com.bxp.rabbitmqapi.DirectExchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
// RabbitMq 消费者示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、生产通信通道
Channel channel = connection.createChannel();
String exchangeName = "test-direct-exchange";
String queueName = "test-direct-queue";
String routeKey = "test-direct-routeKey";
// 4、创建队列,如果队列已经存在,则不会执行任何操作,如果队列不存在,会执行创建队列的操作
channel.exchangeDeclare(exchangeName, "direct", true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routeKey);
//创建队列消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("接收消息: " + new String(body, "utf-8"));
}
};
// 参数介绍:队列名称、是否自动Ack、消费者
channel.basicConsume(queueName, true, consumer);
}
}
在上面的代码中,要先启动消费者,然后在启动生产者,因为要先启动消费者进行交换机、队列的创建以及交换机和队列的绑定。如果先启动生产者,生产者会报错。在实际的使用中,在生产者和消费者中都会写好这些创建的代码,以免报错。
三、Topic模式
Topic Exchange模式下,exchange会将消息的RouteKey和绑定关系的RouteKey进行模糊匹配,如果能够匹配,消息将被发送到对应的队列。
注意:可以使用通配符进行模糊匹配
符号“#”匹配一个或者多个词
符号“*”匹配一个词
例如:“log.#”能够匹配到“log.info.abc”和“log.warn”
"log.*"只能匹配到“log.warn” 而不能匹配到“log.info.abc”
示例:
生产者:
package com.bxp.rabbitmqapi.TopicExchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Sender {
// RabbitMq 生产者入门示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、创建通信通道
Channel channel = connection.createChannel();
// 4、创建队列,如果队列已经存在,则不会执行任何操作,如果队列不存在,会执行创建队列的操作
//String queueName = "test-001";
// 参数:队列名称、是否持久化、是否独占(队列仅供此连接使用,即只能有一个消费者)、不使用时是否自动删除、其他参数
//channel.queueDeclare("test-001", false, false, false , null);
// 5、创建消息的属性
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
// 生产消息
String exchangeName = "test-topic-exchange";
String routeKey1 = "user.save";
String routeKey2 = "user.update";
String routeKey3 = "user.delete.abc";
String message = "RabbitMq Message test";
// 生产者端在发送消息时,只需要关心exchange和routeKey即可
channel.basicPublish(exchangeName, routeKey1, properties, message.getBytes());
channel.basicPublish(exchangeName, routeKey2, properties, message.getBytes());
channel.basicPublish(exchangeName, routeKey3, properties, message.getBytes());
channel.close();
connection.close();
}
}
消费者1:
package com.bxp.rabbitmqapi.TopicExchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
// RabbitMq 消费者示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、生产通信通道
Channel channel = connection.createChannel();
String exchangeName = "test-topic-exchange";
String queueName = "test-topic-queue";
String routeKey = "user.#";
// 4、创建队列,如果队列已经存在,则不会执行任何操作,如果队列不存在,会执行创建队列的操作
channel.exchangeDeclare(exchangeName, "topic", true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routeKey);
//创建队列消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("接收消息: " + new String(body, "utf-8") + ",routingKey:" + envelope.getRoutingKey());
}
};
// 参数介绍:队列名称、是否自动Ack、消费者
channel.basicConsume(queueName, true, consumer);
}
}
消费者2:(在执行消费者2的时候,先在控制台对exchange和queue进行解绑,否则会受到消费者1中的绑定关系的影响)
package com.bxp.rabbitmqapi.TopicExchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver2 {
// RabbitMq 消费者示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、生产通信通道
Channel channel = connection.createChannel();
String exchangeName = "test-topic-exchange";
String queueName = "test-topic-queue";
String routeKey = "user.*";
// 4、创建队列,如果队列已经存在,则不会执行任何操作,如果队列不存在,会执行创建队列的操作
channel.exchangeDeclare(exchangeName, "topic", true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routeKey);
//创建队列消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("接收消息: " + new String(body, "utf-8") + ",routingKey:" + envelope.getRoutingKey());
}
};
// 参数介绍:队列名称、是否自动Ack、消费者
channel.basicConsume(queueName, true, consumer);
}
}
四、Fanout Exchange
fanout类型的exchange:
不处理路由键、只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
Fanout交换机转发消息是最快的,direct交换机转发消息次之,Topic交换机转发消息最慢。
Fanout交换机转发消息的模型图如下:
示例:
生产者:
package com.bxp.rabbitmqapi.FanoutExchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Sender {
// RabbitMq 生产者入门示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、创建通信通道
Channel channel = connection.createChannel();
// 5、创建消息的属性
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
// 生产消息
String messagePrefix = "RabbitMq Message test00";
for(int i = 0; i < 5; i++){
String message = messagePrefix + i;
channel.basicPublish("test-fanout-exchange", "", properties, message.getBytes());
}
channel.close();
connection.close();
}
}
消费者:
package com.bxp.rabbitmqapi.FanoutExchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Sender {
// RabbitMq 生产者入门示例
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.40.110");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 2、生产连接
Connection connection = connectionFactory.newConnection();
// 3、创建通信通道
Channel channel = connection.createChannel();
// 5、创建消息的属性
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
// 生产消息
String messagePrefix = "RabbitMq Message test00";
for(int i = 0; i < 5; i++){
String message = messagePrefix + i;
channel.basicPublish("test-fanout-exchange", "", properties, message.getBytes());
}
channel.close();
connection.close();
}
}
五、其他关键概念讲解
1、Binding——绑定关系
绑定关系是建立在Exchange和Exchange、Exchange和Queue之间的关系。虽然Exchange和Exchange之间也能建立绑定关系,但是不推荐这么使用,这样会使得业务变得复杂,尽量使用Exchange和Queue之间的绑定关系。
Binding中可以包含RoutingKey或者参数
2、Queue——消息队列
消息队列,实际存储消息的载体
Durability:是否持久化,Durable:是,Transient:否
Auto Delete:如果选yes,代表当最后一个监听被移除后,该Queue会被自动删除,即队列没有消费者的时候,会自动被删除。
3、Message——消息
服务器和应用程序之间传送的数据
本质上就是一段数据,有Properties和Payload(Body)组成
常用属性:
content-type: 内容体的类型,如application/json
content-encoding: 压缩或编码格式
message-id和correlation-id: 唯一标识消息和消息响应,用于工作流程中实现消息跟踪,可用于消息消费的幂等性。
timestamp: 减少消息大小,描述消息创建时间
expiration: 表明消息过期
delivery-mode: 将消息写入磁盘或内存队列,即是否持久化:1 不持久化;2 持久化
app-id和user-id: 帮助追踪出现问题的消息发布者应用程序
type: 定义消息类型的自由格式字符串值
reply-to: 实现响应消息的路由
headers: 是一个映射表,定义自由格式的属性和实现rabbitmq路由
4、Virtual Host——虚拟主机
虚拟地址,用于进行逻辑隔离,最上层的消息路由
一个Virtual Host里面可以有若干个Exchange和Queue
同一个Virtual Host里面不能有相同名称的Exchange或queue
标签:connectionFactory,String,队列,RabbitMq,API,详细,import,com,channel 来源: https://blog.csdn.net/b15735105314/article/details/114750804