记TransmittableThreadLocal使用不当造成的一次线上事故
作者:互联网
前言
我们知道InheritableThreadLocal解决父子线程的问题,它是在线程创建的时候进行复制上下文的,即线程池在创建子线程时,会把父线程中共享变量的值初始化复制到子线程中,如果父线程再将共享变量中的值进行修改,那么子线程就无法感知到这个共享变量的修改。
那么对于线程池已经创建完的线程,这个线程中共享变量的值永远是初始化时从父线程中获取的,父线程中共享变量的值再怎么改变,这个线程中的共享变量值也不会改变。这时,TransmittableThreadLocal就诞生了,它可以解决上述问题。但如果要使TransmittableThreadLocal真正能解决上述问题,需要和TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1)) 配合着使用。而我们的线上事故就是由于使用的默认线程池导致TransmittableThreadLocal蜕变成了InheritableThreadLocal导致:子线程中共享变量的值不是获取的父线程中实时共享变量的值。
事故详情
我们的服务是以Thrift协议对外提供能力的,为了在接口入口处获取入参中的共享变量值,我们通过切面的方式将共享变量放入TransmittableThreadLocal中,方便下层代码获取这个共享变量,而无需层层传递:
import com.api.vo.common.request.CommonRequest;
import com.api.vo.common.response.CommonResponse;
import com.common.constant.Constants;
import com.common.util.CommonRequestUtil;
import com.server.aspect.exception.GlobalExceptionHandler;
import com.service.mcc.MccConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
/**
* @author : mazhen
* Date : 2021/11/11
* Time : 20:22
* ---------------------------------------
* Desc : 接口切面, 目前做异常统一处理
*/
@Slf4j
@Aspect
@Component
public class AspectHandler {
@Resource
private GlobalExceptionHandler globalExceptionHandler;
@Pointcut("execution(public * com.xxx.server.impl..*.*(..))")
public void aspect() {
}
/**
* @Description 接口切面, 统一异常处理
* @Date 2021/11/11
* @Param [pjp]
* @Return java.lang.Object
**/
@Around("aspect()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
String requestArgs = Arrays.toString(pjp.getArgs());
Signature signature = pjp.getSignature();
Class<?> returnType = ((MethodSignature) signature).getReturnType();
CommonResponse<?> commonResponse = (CommonResponse<?>) returnType.newInstance();
String methodName = ((MethodSignature) signature).getMethod().getName();
if (!MccConfig.COMMON_REQ_PROCESS_SWITCH) {
setCommonRequest(pjp);
}
Object obj = null;
try {
obj = pjp.proceed();
} catch (Exception e) {
return globalExceptionHandler.handle(e, methodName, requestArgs, commonResponse);
} finally {
if (!MccConfig.COMMON_REQ_PROCESS_SWITCH) {
removeCommonRequest();
}
}
return obj;
}
/**
* 设置CommonRequest
* @param pjp 切点
*/
private void setCommonRequest(ProceedingJoinPoint pjp) {
try {
Object[] params = Optional.ofNullable(pjp).map(ProceedingJoinPoint::getArgs).orElse(null);
if (params == null || ArrayUtils.isEmpty(params)) {
return;
}
Arrays.stream(params).filter(Objects::nonNull).forEach(param -> {
if(!Objects.equals(param.getClass(), CommonRequest.class)) {
return;
}
CommonRequest commonRequest = (CommonRequest) param;
if (Objects.isNull(commonRequest.getUserId())) {
commonRequest.setUserId(-1L);
}
if (Objects.isNull(commonRequest.getCityId())) {
commonRequest.setCityId(-1L);
}
CommonRequestUtil.setCommonRequest(commonRequest);
});
} catch (Exception e) {
log.error("setCommonRequest-ERROR", e);
Cat.logEvent(Constants.CAT_BUS_EXCEPTION_TYPE, "[ERROR]setCommonRequest");
}
}
/**
* 删除CommonRequest
*/
private void removeCommonRequest() {
try {
CommonRequestUtil.removeCommonRequest();
} catch (Exception e) {
log.error("removeCommonRequest-ERROR", e);
Cat.logEvent(Constants.CAT_BUS_EXCEPTION_TYPE, "[ERROR]removeCommonRequest");
}
}
}
import com.api.enums.ResponseCodeEnum;
import com.api.vo.common.response.CommonResponse;
import com.common.util.ExceptionTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
/**
* @author : mazhen
* Date : 2021/9/14
* Time : 5:02 下午
* ---------------------------------------
* Desc : 异常全局处理
*/
@Slf4j
@Component
public class GlobalExceptionHandler {
private Map<Class<? extends Throwable>, Method> methodMap = new HashMap<>();
private List<ExceptionHandleMethod> methodList = new ArrayList<>();
private final Object exceptionHandler = ExceptionHandlerCollection.INSTANCE;
public GlobalExceptionHandler() {
Method[] methods = exceptionHandler.getClass().getDeclaredMethods();
for (Method method : methods) {
if(method.isAnnotationPresent(TargetException.class)) {
TargetException annotation = method.getAnnotation(TargetException.class);
if (!validMethod(method.getParameterTypes(), annotation.value())) {
continue;
}
methodMap.put(annotation.value(), method);
ExceptionHandleMethod handleMethod = ExceptionHandleMethod.builder()
.method(method)
.exceptionValue(annotation.value())
.order(annotation.order())
.build();
methodList.add(handleMethod);
}
}
Collections.sort(methodList);
}
public CommonResponse<?> handle(Throwable e, String methodName, String requestArgs, CommonResponse<?> commonResponse) {
try {
Method method = methodMap.get(e.getClass());
if (Objects.nonNull(method)) {
try {
return (CommonResponse<?>) method.invoke(exceptionHandler, e, methodName, requestArgs, commonResponse);
} catch (IllegalAccessException | InvocationTargetException illegalAccessException) {
log.error("异常全局处理异常, map error = {}", ExceptionTool.getStackTrace(illegalAccessException));
Cat.logError("异常全局处理异常", illegalAccessException);
}
}
for (ExceptionHandleMethod handleMethod : methodList) {
if (handleMethod.getExceptionValue().isAssignableFrom(e.getClass())) {
try {
return (CommonResponse<?>) handleMethod.getMethod().invoke(exceptionHandler, e, methodName, requestArgs, commonResponse);
} catch (IllegalAccessException | InvocationTargetException illegalAccessException) {
log.error("异常全局处理异常, list error = {}", ExceptionTool.getStackTrace(illegalAccessException));
Cat.logError("异常全局处理异常", illegalAccessException);
}
}
}
} catch (Exception ex) {
log.error("异常全局处理异常 error = {}", ExceptionTool.getStackTrace(ex));
Cat.logError("异常全局处理异常", ex);
}
commonResponse.setCode(ResponseCodeEnum.UNKNOWN_ERROR.getCode());
commonResponse.setMsg(e.getMessage());
return commonResponse;
}
/**
* 异常处理方法的注解异常和方法参数中的异常类型必须一致
* @param classArr 参数类型数组
* @param exceptionValue 注解异常
* @return true 参数中包含处理的异常
*/
private boolean validMethod(Class<?>[] classArr, Class<? extends Throwable> exceptionValue) {
for (Class<?> klass : classArr) {
if (klass == exceptionValue) {
return true;
}
}
return false;
}
}
import lombok.Builder;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;
import java.lang.reflect.Method;
/**
* @author : mazhen
* Date : 2021/9/14
* Time : 8:47 下午
* ---------------------------------------
* Desc : 异常处理实体
*/
@Builder
@Getter
public class ExceptionHandleMethod implements Comparable<ExceptionHandleMethod> {
/**
* 异常处理方法
*/
private final Method method;
/**
* 异常类class
*/
private final Class<? extends Throwable> exceptionValue;
/**
* 异常处理方法执行顺序, 从小到大排序
*/
private final int order;
@Override
public int compareTo(@NotNull ExceptionHandleMethod o) {
return order - o.order;
}
}
import com.alibaba.ttl.TransmittableThreadLocal;
import com.api.vo.common.request.CommonRequest;
/**
* @author : mazhen
* Date : 2021/9/29
* Time : 10:34 下午
* ---------------------------------------
* Desc :
*/
public final class CommonRequestUtil {
private static TransmittableThreadLocal<CommonRequest> threadLocalContext = new TransmittableThreadLocal<>();
public static CommonRequest getCommonRequest(){
return threadLocalContext.get();
}
public static void setCommonRequest(CommonRequest commonRequest){
threadLocalContext.set(commonRequest);
}
public static void removeCommonRequest(){
threadLocalContext.remove();
}
}
错误代码中未使用支持TransmittableThreadLoacl的线程池,此时默认使用JDK内部ForkJionPool来执行对应的业务逻辑,导致在执行任务的线程内部,从TransmittableThreadLoacl中获取commonRequest时,无法拿到正确的commonRequest,从commonRequest中也无法获取正确的用户ID。错误代码如下:
正确代码:
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,6,100, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
ExecutorService executorService = TtlExecutors.getTtlExecutorService(threadPoolExecutor);
couVOCompletableFuture = CompletableFuture.supplyAsync(() ->
getSkuPage(couRequest,pageRequest,commonRequest),executorService);
原因分析
修改前:
commonRequest是通过传参的方式传入“XXX”相关业务逻辑中使用,在“XXX”相关业务逻辑内部中将commonRequest设置到了TransmittableThreadLoacl中,并且在“XXX”相关业务逻辑执行完成后,将TransmittableThreadLoacl内的commonRequest清理掉。“XXX”业务逻辑中并没有开启多线程处理,不涉及在父子线程中传递commonRequest的问题,所以“XXX”在优化上线前,可以正常获取commonRequest。
修改后:
为扩大commonRequest的作用域,将设置commonRequest的逻辑,前置到了thrift接口接受请求时(thrift线程中完成设置commonRequest),“XXX”业务逻辑是在thrift线程的子线程中执行的,这样就存在了跨线程传递commonRequest的问题。代码中使用了不支持TransmittableThreadLoacl的线程池(JDK内部ForkJionPool),就导致了在“XXX”业务的逻辑中通过TransmittableThreadLoacl拿到的commonRequest是错误的。
标签:使用不当,return,线程,TransmittableThreadLocal,commonRequest,import,异常,public,线上 来源: https://blog.csdn.net/mameng1988/article/details/121276764