如何保证消息不丢失
作者:互联网
当发送者发送消息到 RabbitMQ 后,RabbitMQ 会将消息缓存在内存中,而如果此时 RabbitMQ 宕机了,默认情况下,内存中的 queue 和 message 都会全部丢失。
而如果我们需要保证消息不丢失,那么需要告诉 RabbitMQ 如何做;此时我们需要做的是:将 queue 和 message 都设置为持久化。
queue 持久化:
private static final String queueName = "hyf.work.queue";
boolean durable = true;
channel.queueDeclare(queueName, durable, false, false, null);
注意:如果一开始 queue 已经定义为不持久化,那么我们不能重定义为持久化;当 RabbitMQ 检测到 queue 被重定义了,那么会返回一个错误来提示我们。
message 持久化:
private static final String queueName = "hyf.work.queue";
channel.basicPublish("", queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
三、发布订阅模式(Publish/Subscribe)
上面的 work queue,每一个消息只能被一个消费者消费。而有些场景,我们需要一个消息可以被多个消费者消费;例如:用户下了订单,短信通知模块需要给用户发送一个短信通知,库存模块需要根据用户下单信息减去商品的库存等等,此时我们需要使用发布订阅模式。
1、交换器 exchange
要做发布订阅模式,我们首先需要使用到交换器,生产者不再直接利用 channel
往 queue 发送消息,而是将消息发送到交换器,让交换器来决定发送到哪些 queue 中。
RabbitMQ 提供了几个类型的交换器:direct
、topic
、headers
和 fanout
。
使用发布订阅模式,我们只需要使用 fanout
类型的交换器,fanout
类型的交换器,会将消息发送到所有绑定到此交换器的 queue。
2、生产者发送消息:
利用 channel 声明交换器:
// 声明交换器名字和类型
channel.exchangeDeclare(exchangeName,"fanout");
接着我们就可以直接指定交换器进行消息发布:
// 第二个参数是 queueName/routingKey
channel.basicPublish(exchangeName , "", null, message.getBytes())
完整代码:
public class Send {
private static final String exchangeName = "hyf.ps.exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
// 声明 fanout 类型的交换器
channel.exchangeDeclare(exchangeName,"fanout");
for (int i = 0; i <= 10; i++){
String message = "消息"+i;
// 直接指定交换器进行消息发布
channel.basicPublish(exchangeName,"", null, message.getBytes());
}
}
}
}
我们可以发现,我们不再需要指定 queueName,而是直接指定 exchangeName,将消息发送到交换器,由交换器决定发布到哪些 queue。
3、消费者:queue 与 exchange 建立绑定关系
建立绑定前,我们还是需要先声明 fanout
类型的交换器,并且命名要和生产者声明时的名字一致:
channel.exchangeDeclare(exchangeName, "fanout");
接着,将 queue 和 fanout
类型的交换器建立绑定消息,交换器会将消息发送到和它有绑定关系的 queue。
channel.queueBind(queueName, exchangeName, "");
此时,队列已经和交换器成功建立绑定关系,交换器接收到消息时,会发送到与交换器绑定的所有队列中。
最后,我们再调用 channel.basicConsume() 进行队列监听和 绑定回调,借此来接收和消费消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
完整代码:
public class Receive1 {
private static final String exchangeName = "hyf.ps.exchange";
private static final String queueName = "hyf.ps.queue1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName,"fanout");
channel.queueDeclare(queueName,false, false, false, null);
channel.queueBind(queueName, exchangeName,"");
DeliverCallback callback = (s, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
channel.basicQos(1);
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, callback, consumerTag -> {});
}
}
关于发布订阅模式,我们可以理解为下图:
4、发布丁订阅模式中使用工作模式
发布订阅模式中,我们还是可以继续使用上面的工作模式(多个消费者订阅同一个队列)。因为在分布式系统中,一个服务往往有多个实例,例如库存模块可以有多个实例,我们利用手动 ack 和 prefetchCount = 1,还是可以让 fanout 类型交换器的其中一个 queue 进入工作模式。
四、路由模式(routing)
上面的发布订阅模式,只要是与 fanout
类型交换器绑定的 queue,都会接收到交换器发布的消息。而我们现在的场景需要更加灵活消息分配机制。例如:error 队列只会接收到 error 类型的信息,info 队列只会接收都 info 类型的信息等等。
那么我们需要是使用灵活的路由模式,而这种模式还是需要由交换器来完成,但是此时需要使用 direct
类型的交换器来替代 fanout
类型的交换器。
bindingKey 和 routingKey
做到路由模式,不但要使用 direct
类型的交换器,还需要利用 bindingKey
和 routingKey
来完成。bindingKey 是消费者端的概念,而 routingKey 是生产者端的概念。
1、bingdingKey
发布订阅模式的消费者代码中,我们可以发现:将 queue 与交换器建立绑定关系的 queueBind() 方法中,第三个参数是空的,其实这就是配置 bindingKey 的地方。当然了,即使第三个参数不为空,fanout 类型的交换器还是会直接忽略掉的。
channel.queueBind(queueName, exchangeName, "");
例如现在我们的消费者要监听 error 类型的信息,我们需要声明 direct 类型的交换器,并且给 queue 绑定值为 error 的 bindingKey 。
public class ErrorReceive {
private static final String exchangeName = "hyf.routing.exchange";
private static final String queueName = "hyf.routing.error.queue";
private static final String bindingKey = "error";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明 exchange 和 queue
channel.exchangeDeclare(exchangeName, "direct");
channel.queueDeclare(queueName, false, false, false, null);
// 进行绑定
channel.queueBind(queueName, exchangeName, bindingKey);
DeliverCallback callback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(),"utf-8");
System.out.println("ErrorReceive 接收到" + delivery.getEnvelope().getRoutingKey() + "消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
channel.basicQos(1);
channel.basicConsume(queueName, false, callback, consumerTag -> {});
}
}
例如现在我们的消费者2要监听 info 类型的信息,这也是非常简单,同样是上面的代码,只需要修改 queueName 和 bindingKey 即可。
// ... 省略
private static final String queueName = "hyf.info.queue";
private static final String bindingKey = "info";
// ... 省略
2、queue 绑定多个 bindingKey
上面的 hyf.error.queue
队列,只绑定了值为 error 的 bindingKey,如果现在我们不但需要接收 error 类型的信息,还需要 info 类型的信息,那么我们可以为 hyf.error.queue
再绑定多一个值为 info 的 bindingKey。
private static final String bindingKey = "error";
private static final String bindingKey2 = "info";
// 进行绑定
channel.queueBind(queueName, exchangeName, bindingKey);
channel.queueBind(queueName, exchangeName, bindingKey2);
此时,hyf.error.queue
队列同时绑定了 error 和 info 这两个 bindingKey,那么它就能同时接收到 error 类型和 info 类型的信息。
3、routingKey
在发布订阅模式中。我们可以看到发布消息的 basicPublish() 方法的第二参数是空的,而第二个参数其实就是 routingKey。
channel.basicPublish( exchangeName, "", null, message.getBytes());
我们可以发现,在普通队列和工作模式中,我们都是指定 queueName 去发送消息,而 queueName 在 basicPublish 也是第二个位置。所以,在我们不使用交换器时,routingKey 指定的就是 queueName。而当我们使用交换器时,那么 routingKey 就有更丰富的含义了,它不再只是简单直接的 queueName,而是各种各样的路由含义。
要使得上面绑定了 bindingKey 为 error 和 info 的 hyf.error.queue
队列接收到消息,那么需要消息发送者指定 routingKey 为 error 或 info ,然后使用 direct
类型的交换器发布消息。
private static final String exchangeName = "hyf.log.exchange";
private static final String routingKey = "error";
private static final String routingKey2 = "info";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
当执行上面代码,hyf.error.queue
队列能收到两条消息,而 hyf.info.queue
只能收到 routingKey 为 info 的消息。
即当 queue 绑定的 bindingKey 和发送消息时的 routingKey 完全一致,那么 queue 就能接收到交换器发送的消息,我们可以理解为下图:
五、主题模式(topic)
上面的路由模式虽然能让我们根据业务更加灵活的去接收指定(多种)类型的消息;但是我们可以发现,如果现在我们想让消费者接收所有类型的信息,例如 error、info、debug、fail 等消息全部都要接收,那么就要调用多次 queueBind() 方法给 queue 绑定多个 bindingKey,这就显得有点麻烦了。
此时我们可以使用主题模式,即使用 topic
类型的交换器,然后利用 *
和 #
这两个符号来搞定上面的需求。
1、* 和 # 的使用
"*" 表示匹配一个字符,"#" 表示匹配0个或多个字符
2、场景
我们现在有多个 routingKey 的消息,例如用户登陆信息 user.login.info
,订单信息 order.detail.info
,用户的注册信息 user.register.info
,库存信息stock.detail.info
等等。
3、消费者
假设消费者1想读取到所有关于用户的信息,例如登陆信息和注册时心,那么我们可以使用 topic
类型的交换器,并且将 bindingKey 设置为 user.#
。
public class UserReceive {
private static final String exchangeName = "hyf.topic.exchange";
private static final String bindingKey = "user.#";
private static final String queueName = "hyf.topic.user.queue";
@SneakyThrows
public static void main(String[] args){
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, bindingKey);
DeliverCallback callBack = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "utf-8");
System.out.println("接收到一条user消息:"+msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicQos(1);
channel.basicConsume(queueName, false, callBack, consumerTag -> {});
}
}
假设消费者2 要接收所有上面关于信息的消息,那么他的 bindingKey 可以设置为 *.*.info
。
public class InfoReceive {
private static final String exchangeName = "hyf.topic.exchange";
private static final String bindingKey = "*.*.info";
private static final String queueName = "hyf.topic.info.queue";
@SneakyThrows
public static void main(String[] args){
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, bindingKey);
DeliverCallback callback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "utf-8");
System.out.println("接收到一条info消息:"+msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicQos(1);
channel.basicConsume(queueName, false, callback, consumerTag -> {});
}
}
4、生产者
生产者也需要使用 topic
类型的交换器发送消息。
public class Send {
private static final String exchangeName = "hyf.topic.exchange";
private static final String routingkeyByLogin = "user.login.info";
private static final String routingkeyByRegister = "user.register.info";
private static final String routingkeyByOrder = "order.detail.info";
private static final String routingkeyByStock = "stock.detail.info";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = ConnectionFactoryUtils.getFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(exchangeName, "topic");
String msg1 = "用户张三登陆了";
String msg2 = "新用户李四注册了";
String msg3 = "张三买了一台iphone12";
String msg4 = "iphone12库存减一";
channel.basicPublish(exchangeName, routingkeyByLogin, null, msg1.getBytes());
channel.basicPublish(exchangeName, routingkeyByRegister, null, msg2.getBytes());
channel.basicPublish(exchangeName, routingkeyByOrder, null, msg3.getBytes());
channel.basicPublish(exchangeName, routingkeyByStock, null, msg4.getBytes());
}
}
}
经过上面的代码发布消息,消费者1就能读取到消息 msg1、msg2;而消费者2可以读取到所有的消息。
关于主题模式,大家可以理解为下图:
标签:交换器,String,queue,保证,丢失,static,queueName,channel,消息 来源: https://www.cnblogs.com/gangkaitong/p/13275147.html