分布式定时任务qutz利用redis 实现防重
作者:互联网
1.设计思路:
①利用spring aop做定时任务的拦截
②利用redis实现注册中心
③利用redis ttl机制结合java代码实现心跳检测,机器淘汰,故障转移
2.拦截器代码
package cn.togeek.conf; import cn.togeek.tools.UUIDUtil; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @Aspect @Component public class ScheduledStatisticsHandler2 { @Autowired private StringRedisTemplate redisTemplate; @Value("${server.port}") private String port; private String mId = UUIDUtil.getUUID(); //机器id,模拟k8s每次都不一样 private final Logger logger = LoggerFactory.getLogger(ScheduledStatisticsHandler2.class); @Pointcut("@annotation(org.springframework.scheduling.annotation.Scheduled)") public void proxyAspect() {} private double permitRate = 0.01; @Around("proxyAspect()") public void doInvoke(ProceedingJoinPoint joinPoint) throws Throwable{ Object target = joinPoint.getTarget(); String clazzName = target.getClass().getName(); String methodName = joinPoint.getSignature().getName(); Object[] args = joinPoint.getArgs(); String id = clazzName.concat("#").concat(methodName); Method method = joinPoint.getTarget().getClass().getDeclaredMethod(joinPoint.getSignature().getName()); Scheduled scheduled = method.getAnnotation(Scheduled.class); long fixedRate = scheduled.fixedRate(); if (fixedRate == 0 && StringUtils.isNotEmpty(scheduled.cron())) { logger.error("unsupported scheduled task,the aop current only support fixrate"); return; } String setId = id.concat("&").concat("set"); Set<String> onlineNodes = redisTemplate.opsForSet().members(setId); //在线机器列表 // logger.info("在线机器列表:{}",onlineNodes); String taskId = mId.concat("#").concat(id); boolean isMember = onlineNodes.contains(taskId); redisTemplate.opsForValue().set(taskId,"1",fixedRate,TimeUnit.MILLISECONDS); //保留5s的taskId onlineNodes.stream().forEach(x -> { //淘汰key Long expire = redisTemplate.getExpire(x); if (expire < 0) { redisTemplate.opsForSet().remove(setId,x); } }); Object permitKey = redisTemplate.opsForValue().get(id); //alive key // logger.info("当前默认执行定时任务的机器:{}",permitKey); if (!isMember) { Set<String> keys = redisTemplate.keys("*#".concat(id)); redisTemplate.opsForSet().add(setId,taskId); if (keys.isEmpty()) { process(joinPoint, args, id, setId,taskId); } else { if (!keys.contains(permitKey)) { //存活节点已经不存活了,重新选举 redisTemplate.opsForValue().set(id,taskId); redisTemplate.opsForSet().remove(setId,permitKey); execute(joinPoint,args); } else { if (taskId.equals(permitKey)) { execute(joinPoint,args); } else { logger.info("其他节点执行任务"); } } } } else { Object object = permitKey; String permittaskId = null; if (Objects.isNull(object)) { process(joinPoint, args, id, setId,taskId); } else { permittaskId = ((String) object); if (onlineNodes.contains(permittaskId)) { if (taskId.equals(permittaskId)) { execute(joinPoint,args); } else { logger.info("其他节点执行任务"); } } else { redisTemplate.opsForValue().set(id,taskId); execute(joinPoint,args); } } } } private void execute (ProceedingJoinPoint joinPoint,Object[] args) { try { joinPoint.proceed(args); } catch (Throwable throwable) {} } private void process(ProceedingJoinPoint joinPoint, Object[] args, String id, String setId,String taskId) throws Throwable { redisTemplate.opsForValue().set(id, taskId); redisTemplate.opsForSet().add(setId, taskId); execute(joinPoint,args); } private Long calculate(String corn) { //本方法仅支持简单表达式 String[] args = corn.split(" "); Long times = 0l; for (int i = 0; i < args.length; i++) { String arg = args[i]; boolean ignore = arg.equals("*"); if (i==0) { //秒 times += ignore?0:Integer.valueOf(arg); } else if (i == 1) { //分 } else if (i == 2) { //时 } else if (i == 3) { //day of month } else if (i == 4) { //month } else if (i == 5) { //星期 } else { //年 } } return times; } }
3.application.yml
spring: redis: host: 127.0.0.1 port: 6379
4.缺陷:只能处理fixrate类型的定时任务
@Scheduled(fixedRate = 5 * SECONDS) public void test() throws Exception { String format = new SimpleDateFormat("hh:mm:ss").format(new Date()); System.out.println("task execute in time:" + format); }
标签:args,String,qutz,joinPoint,redis,taskId,防重,import,redisTemplate 来源: https://www.cnblogs.com/g177w/p/15747119.html