ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SpringAOP[N]-@Async异步

2021-11-18 01:00:06  阅读:277  来源: 互联网

标签:异步 return builder public 线程 executor SpringAOP Async class


@Async 注解的方法被调用后异步执行,注意 SpringBoot 中也需要显式开启 @EnableAsync

 

原理肯定是动态代理 + BeanPostProcessor

代码:org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#initializeBean(java.lang.String, java.lang.Object, org.springframework.beans.factory.support.RootBeanDefinition)

  1. aware 接口调用
  2. 初始化前调用
  3. 初始化
  4. 初始化后调用

注:上面是正常流程,粗略看了一下,如果存在循环依赖,得到的也并不是代理类,也就是不会提前处理

 

在初始化后调用中进行了代理 AsyncAnnotationBeanPostProcessor

BeanPostProcessor 的代理逻辑待续,先看一下拦截逻辑 AnnotationAsyncExecutionInterceptor

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

    // MethodInterceptor 实现
    @Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        // 获得被代理的 Class
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        // 获取异步执行所需要的线程池, 父类 AsyncExecutionAspectSupport 方法
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
        
        // 封装为 Callable
        Callable<Object> task = () -> {
            try {
                // target 执行
                Object result = invocation.proceed();
                // 有返回值类型
                if (result instanceof Future) {
                    // 获取值
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                // 父类 AsyncExecutionAspectSupport 方法
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };

        // 子类 AsyncExecutionAspectSupport, 略, 反正是提交
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
    
}

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        // private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
        // 方法为键
        AsyncTaskExecutor executor = this.executors.get(method);
        // 没有
        if (executor == null) {
            Executor targetExecutor;
            // 子类 AnnotationAsyncExecutionInterceptor 实现, 获取 @Async 注解值, 也就是线程池的BeanName
            String qualifier = getExecutorQualifier(method);
            if (StringUtils.hasLength(qualifier)) {
                // 找的是 Executor 类型的指定 BeanName, 且唯一
                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
            }
            else {    
                // 默认线程池
                targetExecutor = this.defaultExecutor.get();
            }
            if (targetExecutor == null) {
                return null;
            }
            // 适配器模式
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            // 缓存
            this.executors.put(method, executor);
        }
        return executor;
    }
    
    protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
        if (Future.class.isAssignableFrom(method.getReturnType())) {
            ReflectionUtils.rethrowException(ex);
        }
        else {
            // Could not transmit the exception to the caller with default executor
            try {
                // 默认 SimpleAsyncUncaughtExceptionHandler, 默认实现是打印 Error 日志
                this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
            }
            catch (Throwable ex2) {
                logger.warn("Exception handler for async method '" + method.toGenericString() +
                        "' threw unexpected exception itself", ex2);
            }
        }
    }
    
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        // .
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {
            executor.submit(task);
            return null;
        }
    }

}
  1. 找线程池
  2. 提交

 

 

 

@EnableAsync

@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                // 这个
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}

 

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        // @EnableAsync 注解数据
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        // BeanPostProcessor 处理器
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        // 注入线程池和异常处理器
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

父类

@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {

    @Nullable
    protected AnnotationAttributes enableAsync;

    @Nullable
    protected Supplier<Executor> executor;

    @Nullable
    protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;

    // 解析 @EnableAsync 注解信息
    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableAsync = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
        if (this.enableAsync == null) {
            throw new IllegalArgumentException(
                    "@EnableAsync is not present on importing class " + importMetadata.getClassName());
        }
    }

    // AsyncConfigurerSupport 是 AsyncConfigurer 实现类, 默认返回 null、null
    // 且并非强制要求注入
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
    // 欸,这里只选取了第一个配置 AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }

注意上面的 AsyncConfigurer 配置,默认是啥也没有的,也就是线程池和异常处理器默认均为null 

 

AsyncAnnotationBeanPostProcessor

注意这里创建了 AsyncAnnotationAdvisor ,而 AsyncAnnotationAdvisor 内部又创建了 AnnotationAsyncExecutionInterceptor,于是它们持有的线程池和异常处理器均为null

    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);

        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }

 

AnnotationAsyncExecutionInterceptor

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
    public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
        // 当前类模式实现是: 没有配置线程池就使用 BeanFactor 获取唯一实例TaskExecutor.class
        // 子类重写了
        this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
        // 默认打印 Error 日志
        this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
    }
}

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
        // 先从 BeanFactory 获取 TaskExecutor 实例
        Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
        // 默认实现 SimpleAsyncTaskExecutor, 来一个新建一个线程
        return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }
}

 

 

总结

  1. 可以使用 AsyncConfigurer 进行全局性的配置,配置线程池和异常处理器, 但是配置多个只会选取一个(顺序?)
  2. 默认没有任何全局配置,默认异常处理器打印 Error 日志,线程池从 BeanFactory 获取 TaskExecutor 实例,没有则创建类似  Executors.newCachedThreadPool() 的线程池,来一个任务创建一个线程
  3. 全局配置使用 AsyncConfigurer 注入即可,@Async 可以单独配置独有的线程池

注意

SpringBoot是配置了默认的线程池的,这个线程池可以被覆盖,且这个线程池的名称有两个{taskExecutor, applicationTaskExecutor}

AnnotationAsyncExecutionInterceptor的策略是如果没有配置,先找 TaskExecutor 类型的,如果不唯一,再找名称 "taskExecutor" 的,没有则使用默认的 Simple 线程池

也就是说如果你覆盖了这个线程池,名称又不特殊,还没有进行配置,@Async也没有单独配置,那么异步线程执行的使用的是简单 Simple 线程池

 

调试发现除了 SpringBoot 的这个默认线程池,还有一个名为 "scheduleTask"的 TaskExecutor

 

 

SpringBoot默认线程池

ThreadPoolTaskExecutorSpringBoot 默认提供的全局的,我们可以直接注入,注意条件 @ConditionalOnMissingBean(Executor.class),一般实际应用肯定是要进行自定义化的
emm,它下面Builder怎么会设计成这样的,每次创建一个 Builder 对象?
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {

    /**
     * Bean name of the application {@link TaskExecutor}.
     */
    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
            ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
            ObjectProvider<TaskDecorator> taskDecorator) {
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }

    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
            AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }

}
  1. 核心线程数8
  2. 最大线程数 MAX
  3. 空闲时间 60s
  4. 队列容量MAX
  5. 默认拒绝策略是抛出异常

注意 ThreadPoolTaskExecutor 是交给了 Spring 管理的,因此也有自己的初始化方法

 

 

注意:@Async 使用时要注意在注解中配置或使用配置类配置

 

 

 

 

  1. AsyncConfigurer 全局配置
  2. TaskExecutor + 名称"taskExecutor" ==> fallback 线程池
  3. Simple 线程池

 

标签:异步,return,builder,public,线程,executor,SpringAOP,Async,class
来源: https://www.cnblogs.com/chenxingyang/p/15570352.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有