利用Redisson实现分布式延时任务调度功能
作者:互联网
定时任务
定时任务是在编码世界中经常遇到的问题,比如定时备份数据库、定时刷新缓存等,可以通过Linux定时任务完成,也可以通过框架如Spring完成,但是在分布式场景中传统单机可以完成功能就不太行了,所以需要借助其他工具来实现任务调度的功能
场景:在一些订单场景中,用户下单后会锁定一些资源,然后用户非正常退出(没有触发取消订单操作),导致订单资源占用无法释放的问题。
借助工具:redisson分布式服务中的分布式调度任务服务(Scheduler Service)
代码
关单任务
定时执行具体任务,主要实现关单,释放相关资源(优惠券等),设置相关状态标志位
注意:Runnable、Callable接口二选一,必须实现序列化字段,因为任务最终要被序列化存储在Redis中
@Slf4j
@Data
public class CloseOrderTask implements Runnable, Serializable {
private static final long serialVersionUID = -8193920383968460660L;
/**
* 订单id
*/
private String orderId;
@Override
@SneakyThrows
public void run() {
// 获取SpringBean,因为此对象不能为Spring所管理,所以需要通过工具获取SpringBean
RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
OrderService orderService = SpringUtils.getBean(OrderService.class);
// 任务必须加锁,因为同一任务可能被多个实例所执行
RLock lock = redissonClient.getLock(ProductProperties.ORDER_TASK_LOCK + orderId);
boolean lockFlag = false;
try {
//尝试获取锁
lockFlag = lock.tryLock(0L, TimeUnit.SECONDS);
if (!lockFlag){ return; }
//获取到锁,正常执行关单操作
log.info("get lock order:{} ", orderId);
OrderDTO order = orderService.getOrder(orderId);
if(orderDetails.getStatus() == OrderStatus.NEW){
// 自动关单操作
orderService.autoCloseOrder(orderDetails);
}
}catch (Exception e){
//TODO 异常情况应添加邮件通知
}finally {
if(lockFlag) {
lock.unlock();
}
}
}
}
关单订单任务调度器
注册一个任务调度器的Bean
注意:Bean的destory方法必须重写,否则在进行关闭Spring容器时,任务调度中心会被关闭,再次启动后不会唤醒
/**
* 关单定时任务
*/
@Bean(destroyMethod = "")
public RScheduledExecutorService rScheduledExecutorService(@Autowired RedissonClient redissonClient){
WorkerOptions workerOptions = WorkerOptions.defaults().workers(CPU_NUM + 1);
ExecutorOptions executorOptions = ExecutorOptions.defaults()
.taskRetryInterval(10 * 60, TimeUnit.SECONDS);
RScheduledExecutorService executorService = redissonClient
.getExecutorService(ProductProperties.CLOSE_ORDER_TASK_EXECUTOR, executorOptions);
executorService.registerWorkers(workerOptions);
return executorService;
}
服务层
例子实现了两个方法,开启一个定时任务和取消一个定时任务。
因为定时任务的取消时通过taskId取消的,所以在提交任务或获取taskId,并对orderId和taskId做了一下映射,在取消订单的时候就比较容易了
@Slf4j
@Service
public class CloseOrderServiceImpl implements CloseOrderService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RScheduledExecutorService executorService;
@Override
public void submitScheduleCloseTask(String orderId) {
CloseOrderTask closeOrderTask = new CloseOrderTask();
closeOrderTask.setOrderId(orderId);
RScheduledFuture<?> schedule = executorService.schedule(closeOrderTask, ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
redisTemplate.opsForValue().set(ProductProperties.ORDER_TASK_MAPPING + orderId, schedule.getTaskId(), ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
log.info("submit automatic close task. orderId: {}, taskId : {}", orderId, schedule.getTaskId());
}
@Override
public void cancelScheduleCloseTask(String orderId) {
String taskId = (String) redisTemplate.opsForValue().get(ProductProperties.ORDER_TASK_MAPPING + orderId);
log.info("cancel automatic close task. orderId: {}, taskId : {}", orderId, taskId);
if (!StringUtils.isEmptyStr(taskId)){
executorService.cancelTask(taskId);
}
}
}
具体使用地点
在用户进行下单操作时可以提交一个定时任务
在用户取消订单时可以取消orderId对应的定时任务
原理分析
数据结构
提交一个任务后,Redis中会添加4个键值对
-
{任务队列名:包路径}:counter
-
string类型
-
记录当前有多少任务待执行
-
-
{任务队列名:包路径}:shceduler
-
zset类型
-
通过score排名,任务队列。通过value关联tasks具体任务
-
-
{任务队列名:包路径}:retry-interval
-
string类型
-
任务重试时间,默认5000
-
-
{任务队列名:包路径}:tasks
-
Hash类型
-
所有具体任务,通过key与shceduler关联执行顺序
-
value序列化的任务
-
执行流程
提交一个延时任务调度任务,会在:scheduler中产生两条数据,分别是任务下一次执行时间和任务下一次执行时间+重试时间
Spring在注册ExecutorService时指定了工人(worker)的数量,会在本地起线程来执行这些待执行的任务。
问题探究
1、任务过期,客户端挂了,然后过一度时间重启,任务是否还会执行。
客户端重启后,过期的任务都会被拿到客户端里面进行消费。
存在同时启动多个客户端,是否会发生任务抢占问题,内部是否有锁机制?答案是会的
2、同一个任务是否会被多个客户端执行
通过两台服务器,每台提交10个任务,多轮测试没有发生同一个任务被多次执行的情况。
加上线程执行sleep后同样进行测试,发现同一任务被不同客户端消费,所以需要有锁机制
3、任务(schedule 单次)执行完毕后,Redis是否会删除任务队列中的任务
任务执行完毕后,会删除tasks、scheduler中的任务,同时counter-1
4、任务队列的过期策略
所有任务的过期时间都是-1(永不过期),保证任务不会丢
5、任务执行异常情况
能够正常捕捉到异常,并进行处理,同时任务会从Redis中删除
标签:orderId,执行,Redisson,延时,任务,taskId,定时,任务调度 来源: https://blog.csdn.net/cong____cong/article/details/109999330