编程语言
首页 > 编程语言> > Java重试机制

Java重试机制

作者:互联网

重试作用:

对于重试是有场景限制的,不是什么场景都适合重试,比如参数校验不合法、写操作等(要考虑写是否幂等)都不适合重试。

远程调用超时、网络突然中断可以重试。在微服务治理框架中,通常都有自己的重试与超时配置,比如dubbo可以设置retries=1,timeout=500调用失败只重试1次,超过500ms调用仍未返回则调用失败。

比如外部 RPC 调用,或者数据入库等操作,如果一次操作失败,可以进行多次重试,提高调用成功的可能性

优雅的重试机制要具备几点:

优雅重试共性和原理:

优雅重试适用场景:

优雅重试解决思路:

切面方式

这个思路比较清晰,在需要添加重试的方法上添加一个用于重试的自定义注解,然后在切面中实现重试的逻辑,主要的配置参数则根据注解中的选项来初始化

优点:

缺点:

消息总线方式

这个也比较容易理解,在需要重试的方法中,发送一个消息,并将业务逻辑作为回调方法传入;由一个订阅了重试消息的consumer来执行重试的业务逻辑

优点:

缺点:

模板方式

优点:

缺点:

把这个单独捞出来,主要是某些时候我就一两个地方要用到重试,简单的实现下就好了,也没有必用用到上面这么重的方式;而且我希望可以针对代码快进行重试

这个的设计还是非常简单的,基本上代码都可以直接贴出来,一目了然:

public abstract class RetryTemplate {

    private static final int DEFAULT_RETRY_TIME = 1;
    private int retryTime = DEFAULT_RETRY_TIME; 
    private int sleepTime = 0;// 重试的睡眠时间

    public int getSleepTime() {
        return sleepTime;
    }

    public RetryTemplate setSleepTime(int sleepTime) {
        if(sleepTime < 0) {
            throw new IllegalArgumentException("sleepTime should equal or bigger than 0");
        }
        this.sleepTime = sleepTime;
        return this;
    }

    public int getRetryTime() {
        return retryTime;
    }

    public RetryTemplate setRetryTime(int retryTime) {
        if (retryTime <= 0) {
            throw new IllegalArgumentException("retryTime should bigger than 0");
        }
        this.retryTime = retryTime;
        return this;
    }

    /**
     * 重试的业务执行代码
     * 失败时请抛出一个异常
     *
     * todo 确定返回的封装类,根据返回结果的状态来判定是否需要重试
     *
     * @return
     */
    protected abstract Object doBiz() throws Exception; //预留一个doBiz方法由业务方来实现,在其中书写需要重试的业务代码,然后执行即可

    public Object execute() throws InterruptedException {
        for (int i = 0; i < retryTime; i++) {
            try {
                return doBiz();
            } catch (Exception e) {
                log.error("业务执行出现异常,e: {}", e);
                Thread.sleep(sleepTime);
            }
        }
        return null;
    }

    public Object submit(ExecutorService executorService) {
        if (executorService == null) {
            throw new IllegalArgumentException("please choose executorService!");
        }
        return executorService.submit((Callable) () -> execute());
    }
}

使用示例:

public void retryDemo() throws InterruptedException {
    Object ans = new RetryTemplate() {
        @Override
        protected Object doBiz() throws Exception {
            int temp = (int) (Math.random() * 10);
            System.out.println(temp);
            if (temp > 3) {
                throw new Exception("generate value bigger then 3! need retry");
            }
            return temp;
        }
    }.setRetryTime(10).setSleepTime(10).execute();
    System.out.println(ans);
}

spring-retry

Spring Retry 为 Spring 应用程序提供了声明性重试支持。 它用于Spring批处理、Spring集成、Apache Hadoop(等等)的Spring。

在分布式系统中,为了保证数据分布式事务的强一致性,在调用RPC接口或者发送MQ时,针对可能会出现网络抖动请求超时情况采取一下重试操作。 用的最多的重试方式就是MQ了,但是如果你的项目中没有引入MQ,就不方便了。

还有一种方式,是开发者自己编写重试机制,但是大多不够优雅。

缺陷

spring-retry 工具虽能优雅实现重试,但是存在两个不友好设计:

Spring Retry 提倡以注解的方式对方法进行重试,重试逻辑是同步执行的,当抛出相关异常后执行重试, 如果你要以返回值的某个状态来判定是否需要重试,可能只能通过自己判断返回值然后显式抛出异常了。只读操作可以重试,幂等写操作可以重试,但是非幂等写操作不能重试,重试可能导致脏写,或产生重复数据。

@Recover 注解在使用时无法指定方法,如果一个类中多个重试方法,就会很麻烦。

spring-retry 结构

RetryPolicy提供了如下策略实现:

BackOffPolicy 提供了如下策略实现:

RetryTemplate主要流程实现:

//示例一
public void upload(final Map<String, Object> map) throws Exception {
        // 构建重试模板实例
        RetryTemplate retryTemplate = new RetryTemplate();
        // 设置重试策略,主要设置重试次数
        SimpleRetryPolicy policy = 
        new SimpleRetryPolicy(3, Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true)); // 设置重试回退操作策略,主要设置重试间隔时间 FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(100); retryTemplate.setRetryPolicy(policy); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); // 通过RetryCallback 重试回调实例包装正常逻辑逻辑,第一次执行和重试执行执行的都是这段逻辑 final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { //RetryContext 重试操作上下文约定,统一spring-try包装 public Object doWithRetry(RetryContext context) throws Exception { System.out.println("do some thing"); Exception e = uploadToOdps(map); System.out.println(context.getRetryCount()); throw e;//这个点特别注意,重试的根源通过Exception返回 } }; // 通过RecoveryCallback 重试流程正常结束或者达到重试上限后的退出恢复操作实例 final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { public Object recover(RetryContext context) throws Exception { System.out.println("do recory operation"); return null; } }; try { // 由retryTemplate 执行execute方法开始逻辑执行 retryTemplate.execute(retryCallback, recoveryCallback); } catch (Exception e) { e.printStackTrace(); } } //示例二 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback,
  RetryState state) throws E, ExhaustedRetryException { //重试策略 RetryPolicy retryPolicy = this.retryPolicy; //退避策略 BackOffPolicy backOffPolicy = this.backOffPolicy; //重试上下文,当前重试次数等都记录在上下文中 RetryContext context = open(retryPolicy, state); try { //拦截器模式,执行RetryListener#open boolean running = doOpenInterceptors(retryCallback, context); //判断是否可以重试执行 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try {//执行RetryCallback回调 return retryCallback.doWithRetry(context); } catch (Throwable e) {//异常时,要进行下一次重试准备 //遇到异常后,注册该异常的失败次数 registerThrowable(retryPolicy, state, context, e); //执行RetryListener#onError doOnErrorInterceptors(retryCallback, context, e); //如果可以重试,执行退避算法,比如休眠一小段时间后再重试 if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { backOffPolicy.backOff(backOffContext); } //state != null && state.rollbackFor(context.getLastThrowable()) //在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常 if (shouldRethrow(retryPolicy, context, state)) { throw RetryTemplate.<E>wrapIfNecessary(e); } } //如果是有状态重试,且有GLOBAL_STATE属性,则立即跳出重试终止;
      //当抛出的异常是非需要执行回滚操作的异常时,才会执行到此处,CircuitBreakerRetryPolicy会在此跳出循环; if (state != null && context.hasAttribute(GLOBAL_STATE)) { break; } } //重试失败后,如果有RecoveryCallback,则执行此回调,否则抛出异常 return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { //清理环境 close(retryPolicy, context, state, lastException == null || exhausted); //执行RetryListener#close,比如统计重试信息 doCloseInterceptors(retryCallback, context, lastException); } }

有状态or无状态

RetryTemplate template = new RetryTemplate();
//重试策略:次数重试策略
RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);
//退避策略:指数退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(3000);
backOffPolicy.setMultiplier(2);
backOffPolicy.setSleeper(new ThreadWaitSleeper());
template.setBackOffPolicy(backOffPolicy);

//当重试失败后,抛出异常
String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        throw new RuntimeException("timeout");
    }
});
//当重试失败后,执行RecoveryCallback
String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        System.out.println("retry count:" + context.getRetryCount());
        throw new RuntimeException("timeout");
    }
}, new RecoveryCallback<String>() {
    @Override
    public String recover(RetryContext context) throws Exception {
        return "default";
    }
});

事务操作需要回滚场景时,当整个操作中抛出的是数据库异常DataAccessException,则不能进行重试需要回滚,而抛出其他异常则可以进行重试,可以通过RetryState实现:

//当前状态的名称,当把状态放入缓存时,通过该key查询获取
Object key = "mykey";
//是否每次都重新生成上下文还是从缓存中查询,即全局模式(如熔断器策略时从缓存中查询)
boolean isForceRefresh = true;
//对DataAccessException进行回滚
BinaryExceptionClassifier rollbackClassifier =
        new BinaryExceptionClassifier(Collections.<Class<? extends Throwable>>singleton(DataAccessException.class));
RetryState state = new DefaultRetryState(key, isForceRefresh, rollbackClassifier);

String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        System.out.println("retry count:" + context.getRetryCount());
        throw new TypeMismatchDataAccessException("");
    }
}, new RecoveryCallback<String>() {
    @Override
    public String recover(RetryContext context) throws Exception {
        return "default";
    }
}, state);

RetryTemplate中在有状态重试时,回滚场景时直接抛出异常处理代码:

//state != null && state.rollbackFor(context.getLastThrowable())
//在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常
if (shouldRethrow(retryPolicy,context, state)) {
    throw RetryTemplate.<E>wrapIfNecessary(e);
}

熔断器场景。在有状态重试时,且是全局模式,不在当前循环中处理重试,而是全局重试模式(不是线程上下文),如熔断器策略时测试代码如下所示。

RetryTemplate template = new RetryTemplate();
CircuitBreakerRetryPolicy retryPolicy =
        new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3));
retryPolicy.setOpenTimeout(5000);
retryPolicy.setResetTimeout(20000);
template.setRetryPolicy(retryPolicy);

for (int i = 0; i < 10; i++) {
    try {
        Object key = "circuit";
        boolean isForceRefresh = false;
        RetryState state = new DefaultRetryState(key, isForceRefresh);
        String result = template.execute(new RetryCallback<String, RuntimeException>() {
            @Override
            public String doWithRetry(RetryContext context) throws RuntimeException {
                System.out.println("retry count:" + context.getRetryCount());
                throw new RuntimeException("timeout");
            }
        }, new RecoveryCallback<String>() {
            @Override
            public String recover(RetryContext context) throws Exception {
                return "default";
            }
        }, state);
        System.out.println(result);
    } catch (Exception e) {
        System.out.println(e);
    }
}

为什么说是全局模式呢?我们配置了isForceRefresh为false,则在获取上下文时是根据key “circuit”从缓存中获取,从而拿到同一个上下文。

Object key = "circuit";
boolean isForceRefresh = false;
RetryState state = new DefaultRetryState(key,isForceRefresh); 

如下RetryTemplate代码说明在有状态模式下,不会在循环中进行重试。

if (state != null && context.hasAttribute(GLOBAL_STATE)) {
   break;
}

判断熔断器电路是否打开的代码:

public boolean isOpen() {
   long time = System.currentTimeMillis() - this.start;
   boolean retryable = this.policy.canRetry(this.context);
   if (!retryable) {//重试失败
      //在重置熔断器超时后,熔断器器电路闭合,重置上下文
      if (time > this.timeout) {
         this.context = createDelegateContext(policy, getParent());
         this.start = System.currentTimeMillis();
         retryable = this.policy.canRetry(this.context);
      } else if (time < this.openWindow) {
         //当在熔断器打开状态时,熔断器电路打开,立即熔断
         if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
            setAttribute(CIRCUIT_OPEN, true);
         }
         this.start = System.currentTimeMillis();
         return true;
      }
   } else {//重试成功
      //在熔断器电路半打开状态时,断路器电路闭合,重置上下文
      if (time > this.openWindow) {
         this.start = System.currentTimeMillis();
         this.context = createDelegateContext(policy, getParent());
      }
   }
   setAttribute(CIRCUIT_OPEN, !retryable);
   return !retryable;
}

从如上代码可看出spring-retry的熔断策略相对简单:

注解介绍

@EnableRetry

表示是否开始重试。

序号属性类型默认值说明
1 proxyTargetClass boolean false 指示是否要创建基于子类的(CGLIB)代理,而不是创建标准的基于Java接口的代理。当proxyTargetClass属性为true时,使用CGLIB代理。默认使用标准JAVA注解

@Retryable

标注此注解的方法在发生异常时会进行重试

序号属性类型默认值说明
1 interceptor String ”” 将 interceptor 的 bean 名称应用到 retryable()
2 value class[] {} 可重试的异常类型
3 include class[] {} 和value一样,默认空,当exclude也为空时,所有异常都重试
4 exclude class[] {} 指定异常不重试,默认空,当include也为空时,所有异常都重试 
5 label String ”” 统计报告的唯一标签。如果没有提供,调用者可以选择忽略它,或者提供默认值。
6 maxAttempts int 3 尝试的最大次数(包括第一次失败),默认为3次。
7 backoff @Backoff @Backoff() 重试补偿机制,指定用于重试此操作的backoff属性。默认为空

@Backoff

不设置参数时,默认使用FixedBackOffPolicy(指定等待时间),重试等待1000ms

序号属性类型默认值说明
1 delay long 0 指定延迟后重试 ,如果不设置则默认使用 1000 milliseconds
2 maxDelay long 0 最大重试等待时间
3 multiplier long 0 指定延迟的倍数,比如delay=5000l,multiplier=2时,第一次重试为5秒后,第二次为10秒,第三次为20秒(大于0生效)
4 random boolean false 随机重试等待时间

@Recover

用于恢复处理程序的方法调用的注释。返回类型必须与@retryable方法匹配。 可抛出的第一个参数是可选的(但是没有它的方法只会被调用)。 从失败方法的参数列表按顺序填充后续的参数。

用于@Retryable重试失败后处理方法,此注解注释的方法参数一定要是@Retryable抛出的异常,否则无法识别,可以在该方法中进行日志处理。

说明

  1. 使用了@Retryable的方法不能在本类被调用,不然重试机制不会生效。也就是要标记为@Service,然后在其它类使用@Autowired注入或者@Bean去实例才能生效。
  2. 要触发@Recover方法,那么在@Retryable方法上不能有返回值,只能是void才能生效。
  3. 使用了@Retryable的方法里面不能使用try...catch包裹,要在发放上抛出异常,不然不会触发。
  4. 在重试期间这个方法是同步的,如果使用类似Spring Cloud这种框架的熔断机制时,可以结合重试机制来重试后返回结果。
  5. Spring Retry不只能注入方式去实现,还可以通过API的方式实现,类似熔断处理的机制就基于API方式实现会比较宽松。

标签:RetryTemplate,Java,策略,重试,state,context,new,机制
来源: https://www.cnblogs.com/ExMan/p/16394538.html