其他分享
首页 > 其他分享> > 记TransmittableThreadLocal使用不当造成的一次线上事故

记TransmittableThreadLocal使用不当造成的一次线上事故

作者:互联网

前言

我们知道InheritableThreadLocal解决父子线程的问题,它是在线程创建的时候进行复制上下文的,即线程池在创建子线程时,会把父线程中共享变量的值初始化复制到子线程中,如果父线程再将共享变量中的值进行修改,那么子线程就无法感知到这个共享变量的修改。

那么对于线程池已经创建完的线程,这个线程中共享变量的值永远是初始化时从父线程中获取的,父线程中共享变量的值再怎么改变,这个线程中的共享变量值也不会改变。这时,TransmittableThreadLocal就诞生了,它可以解决上述问题。但如果要使TransmittableThreadLocal真正能解决上述问题,需要和TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1)) 配合着使用。而我们的线上事故就是由于使用的默认线程池导致TransmittableThreadLocal蜕变成了InheritableThreadLocal导致:子线程中共享变量的值不是获取的父线程中实时共享变量的值。

事故详情

我们的服务是以Thrift协议对外提供能力的,为了在接口入口处获取入参中的共享变量值,我们通过切面的方式将共享变量放入TransmittableThreadLocal中,方便下层代码获取这个共享变量,而无需层层传递:


import com.api.vo.common.request.CommonRequest;
import com.api.vo.common.response.CommonResponse;
import com.common.constant.Constants;
import com.common.util.CommonRequestUtil;
import com.server.aspect.exception.GlobalExceptionHandler;
import com.service.mcc.MccConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

/**
 * @author : mazhen
 * Date    : 2021/11/11
 * Time    : 20:22
 * ---------------------------------------
 * Desc    : 接口切面, 目前做异常统一处理
 */
@Slf4j
@Aspect
@Component
public class AspectHandler {

    @Resource
    private GlobalExceptionHandler globalExceptionHandler;

    @Pointcut("execution(public * com.xxx.server.impl..*.*(..))")
    public void aspect() {

    }

    /**
     * @Description 接口切面, 统一异常处理
     * @Date 2021/11/11
     * @Param [pjp]
     * @Return java.lang.Object
     **/
    @Around("aspect()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        String requestArgs = Arrays.toString(pjp.getArgs());
        Signature signature = pjp.getSignature();
        Class<?> returnType = ((MethodSignature) signature).getReturnType();
        CommonResponse<?> commonResponse = (CommonResponse<?>) returnType.newInstance();
        String methodName = ((MethodSignature) signature).getMethod().getName();
        if (!MccConfig.COMMON_REQ_PROCESS_SWITCH) {
            setCommonRequest(pjp);
        }
        Object obj = null;
        try {
            obj = pjp.proceed();
        } catch (Exception e) {
            return globalExceptionHandler.handle(e, methodName, requestArgs, commonResponse);
        } finally {
            if (!MccConfig.COMMON_REQ_PROCESS_SWITCH) {
                removeCommonRequest();
            }
        }
        return obj;
    }

    /**
     * 设置CommonRequest
     * @param pjp 切点
     */
    private void setCommonRequest(ProceedingJoinPoint pjp) {
        try {
            Object[] params = Optional.ofNullable(pjp).map(ProceedingJoinPoint::getArgs).orElse(null);
            if (params == null || ArrayUtils.isEmpty(params)) {
                return;
            }
            Arrays.stream(params).filter(Objects::nonNull).forEach(param -> {
                if(!Objects.equals(param.getClass(), CommonRequest.class)) {
                    return;
                }
                CommonRequest commonRequest = (CommonRequest) param;
                if (Objects.isNull(commonRequest.getUserId())) {
                    commonRequest.setUserId(-1L);
                }
                if (Objects.isNull(commonRequest.getCityId())) {
                    commonRequest.setCityId(-1L);
                }
                CommonRequestUtil.setCommonRequest(commonRequest);
            });
        } catch (Exception e) {
            log.error("setCommonRequest-ERROR", e);
            Cat.logEvent(Constants.CAT_BUS_EXCEPTION_TYPE, "[ERROR]setCommonRequest");
        }
    }

    /**
     * 删除CommonRequest
     */
    private void removeCommonRequest() {
        try {
            CommonRequestUtil.removeCommonRequest();
        } catch (Exception e) {
            log.error("removeCommonRequest-ERROR", e);
            Cat.logEvent(Constants.CAT_BUS_EXCEPTION_TYPE, "[ERROR]removeCommonRequest");
        }
    }
}
import com.api.enums.ResponseCodeEnum;
import com.api.vo.common.response.CommonResponse;
import com.common.util.ExceptionTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;

/**
 * @author : mazhen
 * Date    : 2021/9/14
 * Time    : 5:02 下午
 * ---------------------------------------
 * Desc    : 异常全局处理
 */
@Slf4j
@Component
public class GlobalExceptionHandler {

    private Map<Class<? extends Throwable>, Method> methodMap = new HashMap<>();
    private List<ExceptionHandleMethod> methodList = new ArrayList<>();
    private final Object exceptionHandler = ExceptionHandlerCollection.INSTANCE;

    public GlobalExceptionHandler() {

        Method[] methods = exceptionHandler.getClass().getDeclaredMethods();
        for (Method method : methods) {
            if(method.isAnnotationPresent(TargetException.class)) {
                TargetException annotation = method.getAnnotation(TargetException.class);
                if (!validMethod(method.getParameterTypes(), annotation.value())) {
                    continue;
                }

                methodMap.put(annotation.value(), method);
                ExceptionHandleMethod handleMethod = ExceptionHandleMethod.builder()
                        .method(method)
                        .exceptionValue(annotation.value())
                        .order(annotation.order())
                        .build();
                methodList.add(handleMethod);
            }
        }

        Collections.sort(methodList);
    }

    public CommonResponse<?> handle(Throwable e, String methodName, String requestArgs, CommonResponse<?> commonResponse) {
        try {
            Method method = methodMap.get(e.getClass());
            if (Objects.nonNull(method)) {
                try {
                    return (CommonResponse<?>) method.invoke(exceptionHandler, e, methodName, requestArgs, commonResponse);
                } catch (IllegalAccessException | InvocationTargetException illegalAccessException) {
                    log.error("异常全局处理异常, map error = {}", ExceptionTool.getStackTrace(illegalAccessException));
                    Cat.logError("异常全局处理异常", illegalAccessException);
                }
            }

            for (ExceptionHandleMethod handleMethod : methodList) {

                if (handleMethod.getExceptionValue().isAssignableFrom(e.getClass())) {
                    try {
                        return (CommonResponse<?>) handleMethod.getMethod().invoke(exceptionHandler, e, methodName, requestArgs, commonResponse);
                    } catch (IllegalAccessException | InvocationTargetException illegalAccessException) {
                        log.error("异常全局处理异常, list error = {}", ExceptionTool.getStackTrace(illegalAccessException));
                        Cat.logError("异常全局处理异常", illegalAccessException);
                    }
                }
            }
        } catch (Exception ex) {
            log.error("异常全局处理异常 error = {}", ExceptionTool.getStackTrace(ex));
            Cat.logError("异常全局处理异常", ex);
        }

        commonResponse.setCode(ResponseCodeEnum.UNKNOWN_ERROR.getCode());
        commonResponse.setMsg(e.getMessage());
        return commonResponse;
    }

    /**
     * 异常处理方法的注解异常和方法参数中的异常类型必须一致
     * @param classArr 参数类型数组
     * @param exceptionValue 注解异常
     * @return true 参数中包含处理的异常
     */
    private boolean validMethod(Class<?>[] classArr, Class<? extends Throwable> exceptionValue) {

        for (Class<?> klass : classArr) {
            if (klass == exceptionValue) {
                return true;
            }
        }

        return false;
    }
}
import lombok.Builder;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;

import java.lang.reflect.Method;

/**
 * @author : mazhen
 * Date    : 2021/9/14
 * Time    : 8:47 下午
 * ---------------------------------------
 * Desc    : 异常处理实体
 */
@Builder
@Getter
public class ExceptionHandleMethod implements Comparable<ExceptionHandleMethod> {

    /**
     * 异常处理方法
     */
    private final Method method;
    /**
     * 异常类class
     */
    private final Class<? extends Throwable> exceptionValue;
    /**
     * 异常处理方法执行顺序, 从小到大排序
     */
    private final int order;

    @Override
    public int compareTo(@NotNull ExceptionHandleMethod o) {
        return order - o.order;
    }
}
import com.alibaba.ttl.TransmittableThreadLocal;
import com.api.vo.common.request.CommonRequest;

/**
 * @author : mazhen
 * Date    : 2021/9/29
 * Time    : 10:34 下午
 * ---------------------------------------
 * Desc    :
 */
public final class CommonRequestUtil {

    private static TransmittableThreadLocal<CommonRequest> threadLocalContext = new TransmittableThreadLocal<>();

    public static CommonRequest getCommonRequest(){
        return threadLocalContext.get();
    }

    public static void setCommonRequest(CommonRequest commonRequest){
        threadLocalContext.set(commonRequest);
    }

    public static void removeCommonRequest(){
        threadLocalContext.remove();
    }
}

错误代码中未使用支持TransmittableThreadLoacl的线程池,此时默认使用JDK内部ForkJionPool来执行对应的业务逻辑,导致在执行任务的线程内部,从TransmittableThreadLoacl中获取commonRequest时,无法拿到正确的commonRequest,从commonRequest中也无法获取正确的用户ID。错误代码如下:

 正确代码:

 ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,6,100, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());

        ExecutorService executorService = TtlExecutors.getTtlExecutorService(threadPoolExecutor);

couVOCompletableFuture = CompletableFuture.supplyAsync(() ->
                    getSkuPage(couRequest,pageRequest,commonRequest),executorService);

原因分析

修改前:

commonRequest是通过传参的方式传入“XXX”相关业务逻辑中使用,在“XXX”相关业务逻辑内部中将commonRequest设置到了TransmittableThreadLoacl中,并且在“XXX”相关业务逻辑执行完成后,将TransmittableThreadLoacl内的commonRequest清理掉。“XXX”业务逻辑中并没有开启多线程处理,不涉及在父子线程中传递commonRequest的问题,所以“XXX”在优化上线前,可以正常获取commonRequest。

修改后:

为扩大commonRequest的作用域,将设置commonRequest的逻辑,前置到了thrift接口接受请求时(thrift线程中完成设置commonRequest),“XXX”业务逻辑是在thrift线程的子线程中执行的,这样就存在了跨线程传递commonRequest的问题。代码中使用了不支持TransmittableThreadLoacl的线程池(JDK内部ForkJionPool),就导致了在“XXX”业务的逻辑中通过TransmittableThreadLoacl拿到的commonRequest是错误的。

标签:使用不当,return,线程,TransmittableThreadLocal,commonRequest,import,异常,public,线上
来源: https://blog.csdn.net/mameng1988/article/details/121276764