RabbitMQ的高级特性--TTL、死信队列、延迟队列
作者:互联网
目录
1.TTL机制
1.1 实现方案
目前的电商业务中订单创建成功,等待支付一般都会给一定的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
如何实现这个功能?
定时轮询(数据库等)
用户下单成功,将订单数据放入数据库,同时将支付状态放入数据库,用户付款更
改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
优点:设计实现简单。
缺点: 需要对数据库进行大量的IO操作,效率低下。
Timer
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("用户没有付款,交易取消:" +
simpleDateFormat.format(new
Date(System.currentTimeMillis())));
timer.cancel();
}
};
System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后执行timerTask
timer.schedule(timerTask, 10 * 1000);
优点: 没有想到
缺点: 没有持久化机制。
不灵活 (只可以设置开始时间和重复间隔, 对于其它业务不太合适)。
不能利用线程池,一个timer一个线程。
没有真正的管理计划。
Scheduler及其它定时器
// 线程工厂
ThreadFactory factory = Executors.defaultThreadFactory();
// 使用线程池
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() {
@Override
public void run() {
System.out.println("用户未付款,交易取消:" +
format.format(new Date()));
}// 等待10s 单位秒
}, 10, TimeUnit.SECONDS);
优点:可多线程执行,一定程度上避免任务互相影响,单个任务异常不影响其它任务
缺点:高并发下不建议使用定时任务,较浪费服务器性能
RabbitMQ 的 TTL
通过对 消息 和 队列 两个维度来设置 TTL
优点:无需消费过多的服务器性能即可实现定时的功能
缺点:任何中间件的容量和堆积能力都是有限的,且消息中间件的引入会增加许多问题
由于消息中间件的容量和堆积能力都是有限的,如果有些消息总是无法消费掉,就需要有一种东西进行兜底。
目前有两种方法设置消息的TTL,两种方法同时使用,则消息的TTL已两者之间 较小数值为准。
1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。
2. 对消息自身进行单独设置,每条消息的TTL可以不同。
1.2 原生API实现
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
// 设置队列属性
Map<String, Object> arguments = new HashMap<>();
// 设置队列的TTL
arguments.put("x-message-ttl", 30000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish("",
QUEUE_NAME,
//设置消息本身的过期时间
new AMQP.BasicProperties().builder().expiration("30000").build(),
message.getBytes())
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默认规则:
1.如果不设置 x-message-ttl, 则表示消息不会过期。
2.如果 x-message-ttl 设置为0, 则表示除非消息可以直接将消息投递到消费者,否则消息会被立即丢弃 。
1.3 SpringBoot实现
@Configuration
public class RabbitConfig {
@Bean
public Queue queueTTLWaiting() {
Map<String, Object> props = new HashMap<>();
// 对于该队列中的消息,设置都等待10s
props.put("x-message-ttl", 10000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
props.put("x-expires", 1000);
Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
return queue;
}
}
@RestController
public class controller{
@RequestMapping("/pay/msgttl")
public String sendTTLMessage() throws UnsupportedEncodingException {
MessageProperties properties = new MessageProperties();
//设置消息本身过期时间
properties.setExpiration("5000");
Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
return "msg-ttl-ok";
}
}
2.死信队列
DLX,全称为 Dead-Letter-Excahnge, 死信交换机。
在各个外卖系统中,用户下单调用订单服务, 然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统 采用 MQ异步通讯。
用户在下单之后,外卖员接单之后再取消接单应该如何处理,用户下单长时间未被接单如何处理?
这种场景下我们可以考虑定义一个死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因,从而j进行下一步的处理,是取消订单还是安排其他的外卖员。
消息在到达队列之后,被重新发送到一个特殊的交换机(DLX)中, 同时,绑定DLX的队列就称为“死信队列”。
一下几种情况导致消息变成死信:
1. 消息被拒绝 (Basic.Reject / Basic.Nack), 并且设置 requeue 参数为 false。
2. 消息过期。
3. 队列达到最大长度。
同时我们也可以在处理异常的时候,如果消息不能被消费者正常消费,可以放置到死信队列,后续进行分析程序中的异常情况,进而改善和优化系统。
2.1 原生API实现
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 定义一个死信交换器(也是一个普通的交换器)
channel.exchangeDeclare("exchange.dlx", "direct", true);
// 定义一个正常业务的交换器
channel.exchangeDeclare("exchange.biz", "fanout", true);
Map<String, Object> arguments = new HashMap<>();
// 设置队列TTL
arguments.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
arguments.put("x-dead-letter-exchange", "exchange.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的 routingKey
arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
//定义交换机队列及绑定关系
channel.queueDeclare("queue.biz", true, false, false, arguments);
channel.queueBind("queue.biz", "exchange.biz", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
// 死信队列和死信交换器
channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");
channel.basicPublish("exchange.biz",
"",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"dlx.test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
2.2 SpringBoot实现
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
Map<String, Object> props = new HashMap<>();
// 消息在队列中的生存时间 10s
props.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
props.put("x-dead-letter-exchange", "ex.go.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
props.put("x-dead-letter-routing-key", "go.dlx");
Queue queue = new Queue("q.go", true, false, false, props);
return queue;
}
}
一般来说,死信队列都会配合RabbitMQ的TTL来使用,在消息超时之后会被自动路由到死信队列中。
3.延迟队列
在一定的业务场景中,我们发送到消息队列中的消息不一定想立即被消费,就比如:
在12306中购买火车票,选中一个座位中订单未支付的时间段中系统会将这个座位锁定,如果超过时间还没有付款的话系统会自动把座位释放掉,怎么实现类似的功能呢?
1.可以用定时任务每分钟扫描一次,发现有超过15分钟的就释放掉,但这样非常浪费系统资源。
2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟之后锁会自动释放,但是这样会长期占用系统的资源。
3. 可以通过TTL配合死信队列来实现。但是TTL扫描是从队列的头依此往后扫描,加入第一个消息没有超时,后续的消息是不会被扫描的,如果队列中的消息过期时间不是固定的,就会导致后面消息过期了,但是仍然没有被处理。
因此可以使用延迟队列,锁座成功之后会发送一条延迟消息到延时交换机,延时交换机轮询所有的消息,消息到指定的时间后会被消费,消费的过程就是检查这个座位是否是“已付款”状态;
RabbitMQ本身并没有提供延时队列的功能,可以用 rabbitmq_delayed_message_exchange 插件来实现,它与TTL最大的不同就是 TTL 存放消息在死信队列中,而它则是存放消息在延时交换机中。
1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
2. 延时交换机(exchange)存储消息持续扫描所有的消息,等待消息到期根据路由键(routekey)找到 绑定自己的队列(queue)并把消息给它
3. 队列(queue)再把消息发送给监听它的消费者(customer)
3.1 延时队列的使用
下载插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件
、 将插件拷贝到rabbitmq-server的安装路径
# 启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启rabbitmq-server
systemctl restart rabbitmq-server
@Configuration
public class RabbitConfig {
@Bean
public Exchange exchange() {
Map<String, Object> props = new HashMap<>();
props.put("x-delayed-type", ExchangeTypes.FANOUT);
Exchange exchange = new CustomExchange("ex.delayed",
"xdelayed-message",
true,
false,
props);
return exchange;
}
public class PublishController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/prepare/{seconds}")
public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException {
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", (seconds - 10) * 1000);
Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"),properties);
rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message);
return "已经定好闹钟了,到时提前告诉大家";
}
}
标签:exchange,队列,--,死信,消息,TTL,new 来源: https://blog.csdn.net/qq_42029989/article/details/122778975