揭开Spring自身异步实现的秘密~@EnableAsync
作者:互联网
我们在开发过程中经常会碰到一些比较复杂的操作,这些操作可能不必及时相应结果,一般这个时候我们希望触发后,让其自己去慢慢执行,最后我们只需要查询到相关结果即可,这个时候一般就是采用异步操作的形式。通常大家为了并行操作,会选用创建一个线程去执行,这样就需要书写比较多的线程代码,无法便捷开发。
spring为我们提供一种注解的形式,可以实现异步操作。
一、应用
开启异步任务使用方法:
1).方法上或类加@Async注解
2).启动类或者配置类上@EnableAsync
二、源码解析
EnableAsync
/**
* NOTE: {@link AsyncConfigurer} configuration classes get initialized early
* in the application context bootstrap. If you need any dependencies on other beans
* there, make sure to declare them 'lazy' as far as possible in order to let them
* go through other post-processors as well.</b>
*
* 注意: AsyncConfigurer配置类在应用程序上下文引导程序的早期初始化。
* 如果您需要对其他 bean 的任何依赖,请确保尽可能将它们声明为“懒惰”,以便让它们也通过其他后处理器。
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
这段话很重要,如果使用spring的异步@Async方法或者类不当,就会报循环依赖异常,作者也是建议我们在使用相关的这些bean时,存在依赖关系的情况下采用lazy的形式,如果我们依赖了异步方法类,我们在引入的类上最好加上@Lazy的注解,以避免循环依赖。
@Import(AsyncConfigurationSelector.class)
spring在启动的过程中,会处理配置类的注解,如果发现@Import注解,则会去判断import的对象是否实现了ImportSelector.class接口,如果实现了则调用 其selectImports()方法,并将方法返回对象引入spring容器中,这些都是spring的扩展点之一,通过这个也可以发现@EnableAsync注解的使用特性,是需要配合@Configuration注解一起使用是最好的。
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
默认走的是PROXY代理,引入的ProxyAsyncConfiguration.class
生成AsyncAnnotationBeanPostProcessor的bean对象。
主要关注他的后置处理器的方法实现。
AsyncAnnotationBeanPostProcessor
重要参数
AsyncAnnotationBeanPostProcessor类
@Nullable
private Supplier<Executor> executor;//线程任务执行器
@Nullable
private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;//异常处理器
@Nullable
private Class<? extends Annotation> asyncAnnotationType;//异步注解类型
AbstractAdvisingBeanPostProcessor类
@Nullable
protected Advisor advisor;//重要的代理逻辑和切点
Advisor advisor 这里是在bean生命周期初始化之前有个Aware方法回调,生成的对应对象是AsyncAnnotationAdvisor类
BeanFactoryAware.setBeanFactory方法回调生成 - new AsyncAnnotationAdvisor
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//主线:检查给定的类是否有资格使用此后处理器的Advisor
if (isEligible(bean, beanName)) {
//准备代理工厂
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//此时添加advisor = AsyncAnnotationAdvisor类
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//生成代理对象
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
AsyncAnnotationAdvisor类
构造方法
//构建 异步advisor
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
//@Async注解作为切点
asyncAnnotationTypes.add(Async.class);
//赋值advice 为 AnnotationAsyncExecutionInterceptor
this.advice = buildAdvice(executor, exceptionHandler);//会将 线程执行器和异常处理器放入进去,不过我们一般不去自定义的话,这个时候是null,他会处理走默认的
this.pointcut = buildPointcut(asyncAnnotationTypes);//根据@Async生成对应的切点
}
buildAdvice(executor, exceptionHandler)方法中
会将一个表达式放入对应的Supplier属性,在需要的时候触发表达式创建
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//走默认的异常处理器
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
return interceptor;
getDefaultExecutor(this.beanFactory)); 方法就是会去获取默认的线程执行器
会先去单例池中获取,如果没有才会自己创建。
在下方会调用到!
getDefaultExecutor的大概逻辑就是:
先去spring容器中获取 TaskExecutor.class 的实例 -> 没有再去spring容器中获取beanName = “taskExecutor” 的实例
-> 再没有 自己去创建 new SimpleAsyncTaskExecutor()
AnnotationAsyncExecutionInterceptor - advice类
很明显 :implements MethodInterceptor 接口 直接看invoke逻辑代码
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//确认 方法要被哪个 异步任务执行器来执行
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//构建 线程run方法
Callable<Object> task = () -> {
try {
//内部执行 目标对象逻辑
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) { //默认的异常处理器
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//将线程交给 线程池中执行 内部会判断方法的返回值类型 根据方法返回值类型来处理提交形式
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
determineAsyncExecutor(userDeclaredMethod); 确定线程执行器
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//根据@Async注解上value值 ,判断是否有指定的TaskExecutor或Executor执行器
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
//调用 表达式方法 这个get实际上调用方法是 getDefaultExecutor(this.beanFactory))
//如果没有在spring容器中创建线程池的单例的话 拿到的是 SimpleAsyncTaskExecutor()对象
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
//返回线程执行器
return executor;
}
@Async 注解是支持集中返回类型的 在 doSubmit(task, executor, invocation.getMethod().getReturnType());方法中也有体现
ListenableFuture 类型 Future类型
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
//ListenableFuture 类型
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
//Future类型
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
//直接 执行
executor.submit(task);
return null;
}
}
SimpleAsyncTaskExecutor 类
作者:
TaskExecutor实现,为每个任务启动一个新线程,异步执行它。
支持通过“concurrencyLimit”bean 属性限制并发线程。
默认情况下,并发线程数是无限的。
注意:此实现不重用线程! 考虑使用线程池 TaskExecutor 实现,特别是用于执行大量短期任务。
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* <p>Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
作者的备注也是很明显的说明这个线程执行器的弊端。
Spring异步EnableAsync的利弊
弊端
- 无限的线程执行—在并发高的情况下,会压垮我们的服务器。
- 每个任务启动一个新线程,不存在线程重用。
- 因为线程不重用在执行短期任务时消耗线程资源会过多。
优势 根据弊端可以优化
- 首先就是对我们开发上,省略了大量线程相关的代码,敏捷开发。
- 在获取Executor的过程中,其实spring是会去单例池获取执行器的实例的。也就是我们在项目中可以定义适用线程池,让其使用。
- 再者根据他的局限性可以选择场景,在一些执行时间比较长,并发不大的方法上使用spring的异步处理。
附上一个执行逻辑图方便大家理解:
感谢大家的阅读,有什么想法意见欢迎留言~
标签:EnableAsync,异步,return,Spring,bean,线程,executor,new,null 来源: https://blog.csdn.net/weixin_38899094/article/details/119150820