Kotlin协程实现原理
作者:互联网
初看suspend关键字
下面的例子模拟一个网络请求:
class Temp {
suspend fun fetchData(argument: String): Boolean {
val result = netRequest(argument)
return result == 0
}
// 模拟网络请求
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument.length
}
}
这两个方法都使用了suspend
关键字修饰,我们将这个文件的字节码反编译为等同效果的Java
代码:
public final class Temp {
@Nullable
public final Object fetchData(@NotNull String argument, @NotNull Continuation var2) {
Object $continuation;
label25: {
if (var2 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var2;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new ContinuationImpl(var2) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData((String)null, this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
var10000 = this.netRequest(argument, (Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number)var10000).intValue();
return Boxing.boxBoolean(result == 0);
}
@Nullable
public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {
Object $continuation;
label20: {
if (var2 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var2;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var2) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest((String)null, this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = argument;
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
return var5;
}
break;
case 1:
argument = (String)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing.boxInt(argument.length());
}
}
几行协程相关的代码,竟然对应了这么多的Java
代码,可见kotlin
编译器为我们做了很多事情。
上面代码的可读性不高,例如有<undefinedtype>
这种未定义的类型,我使用jd-gui
对Temp.class
文件再进行了一次反编译,获取到了更多信息,我将上面的反编译的一大串代码和jd-gui
反编译获取的信息进行整合,并且对一些类和变量进行适当的重命名,得出信息更完整且可读性更高的「Temp.class
反编译后对应的Java
代码」,首先是fetchData
相关的:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
Object $result = $continuation.result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing.boxBoolean(result == 0);
}
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
}
}
netRequest
相关的代码,与fetchData
相关的代码,在结构和形式上类似:
public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label20:
{
if (completion instanceof NetRequestStateMachine) {
$continuation = (NetRequestStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new NetRequestStateMachine(completion);
}
Object $result = $continuation.result;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.functionParameter = argument;
$continuation.label = 1;
if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
argument = (String) ($continuation.functionParameter);
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing.boxInt(argument.length());
}
static final class NetRequestStateMachine extends ContinuationImpl {
Object result;
int label;
Object functionParameter;
NetRequestStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest(null, (Continuation<? super Integer>) this);
}
}
可以发现,反编译后的Java
代码中,fetchData
和netRequest
方法都多了一个Continuation completion
参数,这是Kotlin Compiler
帮我们做的,对于suspend
修饰的函数,编译的时候Kotlin Compiler
会帮我们在该函数中传入一个Continuation
参数,使用Continuation
参数代替了suspend
修饰符,这个参数有什么含义呢?
初识续体
续体是理解协程工作原理的一个关键。
先看传统的网络请求:
data class User(val id: Long, val name: String)
interface Callback {
fun success(user: User)
fun failure(t: Throwable)
}
class Model {
fun getUserInfo(callback: Callback) {
Thread.sleep(1000) // 模拟网络请求
callback.success(User(1, "giagor"))
}
}
class Business {
val model = Model()
fun getUserInfo() {
model.getUserInfo(object : Callback {
override fun success(user: User) {
showMsg(user.toString())
}
override fun failure(t: Throwable) {
showMsg(t.message ?: "")
}
})
}
fun showMsg(msg: String) {
// ...
}
}
在使用Model
进行网络请求的时候,使用Callback
接收网络请求的结果,我们这时候可以将Callback
看作一个续体,即网络请求的续体,用于接收网络请求的结果。
在协程中使用Continuation
接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码:
public interface Continuation<in T> {
// 与该续体对应的协程的上下文
public val context: CoroutineContext
// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
public fun resumeWith(result: Result<T>)
}
在Kotlin 1.3
,也有可以方便地调用resumeWith
的扩展函数:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
正如前面所说,对于suspend
修饰的函数,Kotlin Compiler
会帮我们在该函数中传入一个Continuation
参数,使用Continuation
参数代替了suspend
修饰符,通过Continuation
参数,Kotlin Compiler
可以将我们的协程代码转化为等价的回调代码,也就是说,Kt
编译器帮我们写好了那些回调的代码,至于怎么帮我们写的后面会分析,这种通过传递Continuation
来控制异步调用流程被称作CPS
变换(Continuation-Passing-Style Transformation
)。
状态机
fetchData
函数编译时会生成下面的一个静态内部类(续体):
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);
}
}
FetchDataStateMachine
的继承关系如下:
FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
FetchDataStateMachine
接收一个名称为$completion
的Continuation
参数,$completion
被保存在父类BaseContinuationImpl
中:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {...}
通过$completion
可以将fetchData
函数的执行结果传递回给调用fetchData
的函数,有了$completion
,才有能力去实现回调。
状态机FetchDataStateMachine
声明了result
和label
两个变量
result
:表示上一个Continuation
的结果,比如有函数A
和B
,函数内部分别声明了ContinuationA
和ContinuationB
,A
调用B
并且将ContinuationA
传入B
中保存。在后续回调的过程中,ContinuationA
可以从result
变量中拿到ContinuationB::invokeSuspend
的执行结果。label
:Kotlin Compiler
可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point
)被表示为状态机的一个状态(state
),这些状态通过switch case
语句表示出来。label
表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case
,通过label
变量就记录下了状态机当前的状态。
再看下fetchData
的前半部分代码:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
...
}
它会判断传入的completion
是否为FetchDataStateMachine
类型,若是则对它的label
变量做些操作,若不是则直接创建一个FetchDataStateMachine
并且传入completion
(completion
会被保存下来)。
再看下fetchData
的后半部分代码:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
...
Object $result = $continuation.result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing.boxBoolean(result == 0);
}
fetchData
方法原先的代码语句会被划分为switch
下的多个case
语句,在这里就是
FetchDataStateMachine
中的label
变量就是控制当前要执行哪个case
分支。
可见,函数与续体构成了一个有限状态机(FSM,即 Finite-State Machine),来控制协程代码的执行。
何为「非阻塞式挂起」?
在netRequest
方法中,调用了delay(1000)
挂起了当前的协程,简单看下delay
方法反编译后的代码:
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
// 实现类
CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
cancellableContinuationImpl.initCancellability();
// 向上转型
CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
if (timeMillis < Long.MAX_VALUE) {
// 延时操作
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
}
// 获取执行结果
Object result = cancellableContinuationImpl.getResult();
if (result == COROUTINE_SUSPENDED) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
// 返回结果
return result;
}
}
在该方法里会执行延时操作,如果需要挂起,就会返回COROUTINE_SUSPENDED
值给调用者。
结合fetchData
、netRequest
和delay
反编译的代码,我们可以得出下面的这个调用图:
图中红色的线表示函数返回COROUTINE_SUSPENDED
,需要挂起。当delay
方法需要挂起的时候,它返回COROUTINE_SUSPENDED
,接着netRequest
方法返回COROUTINE_SUSPENDED
,接着fetchData
方法返回COROUTINE_SUSPENDED
,重复这个过程直到调用栈的最上层。
通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,这也是「非阻塞式挂起」的由来。
如何恢复?
既然协程挂起了,那就有相应的协程的恢复。先说结论:协程恢复的实质是对续体进行回调。
暂时还没有研究delay
函数的具体实现,但是delay
函数会在某个子线程执行等待操作,等延时时间到达之后,就会调用传给delay
函数的$completion
的resumeWith
方法,也就是调用NetRequestStateMachine
的resumeWith
方法。NetRequestStateMachine
的继承关系、父类如下:
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl
目前是我们分析的一个重点,它主要做了下面的几件事情:
- 保存
completion
:它保存了fetchData
方法的FetchDataStateMachine
实例,使得可以一级一级地向上回调续体。 - 重写
resumeWith
方法:BaseContinuationImpl
重写了Continuation
接口的resumeWith
方法,该方法用于恢复协程,也是协程恢复的核心逻辑。
我们查看BaseContinuationImpl
类的定义:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分
// 已经恢复了。
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
...
}
重点是resumeWith
方法的实现,它在一个while(true)
循环下面执行回调的逻辑。我们结合前面给出的fetchData
和netRequest
反编译后的代码,看看delay
函数的延时时间到达时调用NetRequestStateMachine
的resumeWith
方法,后续的执行流程是怎样的:
- 执行
NetRequestStateMachine
父类BaseContinuationImpl
的resumeWith
方法。 - 执行当前续体也就是
NetRequestStateMachine
的invokeSuspend
方法(NetRequestStateMachine
有实现该方法,忘记了的话可以回头看看之前的反编译代码)。 - 在
NetRequestStateMachine
的invokeSuspend
方法调用了netRequest
方法,并且将续体自身作为参数传入。 - 在
netRequest
方法中,由于completion
的类型就是NetRequestStateMachine
,因此可以直接使用该续体,不用像之前第一次进入netRequest
方法那样需要创建一个新的续体。此时续体的label
值为1
,于是进入netRequest
的case 1
语句分支。
实际上这个过程有对续体的
label
进行一些运算转化的操作,但是最终label
的值都是1
,做的运算转化操作不影响我们的分析,因此并不是重点
- 从续体中取出一开始传入
netRequest
方法的参数,也就是argument
,返回argument.length
。为了方便后面阐述,这里将该返回值argument.length
记为netRequest-Return
。 - 接着
netRequest
方法结束,NetRequestStateMachine::invokeSuspend
方法也执行结束,netRequest-Return
也作为invokeSuspend
方法的返回值,该返回值会传递到BaseContinuationImpl
的resumeWith
方法中,在resumeWith
方法中,将netRequest-Return
包装为Result
保存到outcome
变量中。 - 判断
NetRequestStateMachine
持有的completion
是否为BaseContinuationImpl
类型,我们知道它持有的实例其实就是FetchDataStateMachine
,因此肯定是BaseContinuationImpl
,于是进行了变量的更新
// 把current更新为FetchDataStateMachine实例
current = completion
// 把param更新为outcome(包装了netRequest-Return的Result)
param = outcome
通过这种方式,其实就可以实现回调,我们继续往后看。
- 继续进行下一轮
while
循环,在with
块中会执行FetchDataStateMachine::invokeSuspend
,在invokeSuspend
里,将传入的参数param
保存到result
变量里(其实这和传统的回调类似,传统的回调中也是要将下层的执行结果回调给上层),接着调用了fetchData
方法。 - 在
fetchData
方法中,由于传入的completion
已是FetchDataStateMachine
类型,因此无需再去创建新的续体。由于此时续体label
的值为1
,所以会进入case 1
语句,并且将netRequest
方法的执行结果保存在resultTemp
变量中,最终fetchData
方法结束并返回结果result == 0
,为了方便阐述,将fetchData
方法的执行结果记为fetchData-Return
。 FetchDataStateMachine::invokeSuspend
方法也会结束并返回fetchData-Return
,然后在BaseContinuationImpl
的resumeWith
方法中将fetchData-Return
包装为Result
。然后会判断FetchDataStateMachine
持有的completion
是否为BaseContinuationImpl
类型。- 代码的后续走向,我们目前是不清楚的,我们得知道在协程中调用
fetchData
方法的时候会做些什么,才能清楚后续的代码走向。
从上面的流程分析中,我们对协程的恢复有了一个基本的认识,下面给出流程图进行总结:
再看看上面续体的调用过程,其实就是层层往上地调用续体的invokeSuspend
方法,从过程来看有点像递归调用,但是BaseContinuationImpl::resumeWith
的实现却和递归不太一样,它的实现是在while(true)
循环中,对续体调用一次invokeSuspend
方法,然后记录它的返回结果,将这个返回结果作为下一个续体invokeSuspend
的方法参数。
简单来讲,就是在调用一个续体的invokeSuspend
方法,待这个方法执行结束后,再调用下一个续体的invokeSuspend
方法。这样做的一个原因是避免调用栈过深,在BaseContinuationImpl::resumeWith
也有相关的注释说明:
This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
启动协程
我们在一个协程中去调用fetchData
方法:
class Temp2 {
fun execute() {
GlobalScope.launch(Dispatchers.Main) {
Temp().fetchData("argument")
}
}
}
通过launch
方法可以启动一个协程,其源码如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
协程中的代码会被包装为一个block
,默认情况下会创建一个StandaloneCoroutine
,然后调用它的start
方法并返回StandaloneCoroutine
。
StandaloneCoroutine
间接的实现了Job
接口和Continuation<T>
接口,如下:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {...}
可以看出StandaloneCoroutine
身兼多职,实现了Job, Continuation<T>, CoroutineScope
接口。后面代码跟踪可以得出一个结论,最顶层的续体实现是协程自身,也就是协程恢复的时候续体会一层层地往上回调,最顶层的续体就是协程coroutine
自身,即StandaloneCoroutine
(这里以StandaloneCoroutine
为例)。
另外还要注意一点,launch
方法中传入的 block
块类型:
block: suspend CoroutineScope.() -> Unit
它等价于下面的这种函数类型:
// CoroutineScope:扩展函数转化而来
// Continuation:suspend关键字转化而来,Continuation参数由编译器传入
block : (CoroutineScope,Continuation) -> Unit
// 或者通过Function2的形式表示
block : Function2<CoroutineScope,Continuation,Unit>
接着跟踪下启动协程的调用过程。在launch
方法中,调用了AbstractCoroutine::start
方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// 语法糖,实际是调用CoroutineStart.invoke方法
start(block, receiver, this)
}
CoroutineStart::invoke
方法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
从launch
方法可以知道CoroutineStart
的默认值是CoroutineStart.DEFAULT
,因此会调用到block
的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
我在AS
跟踪createCoroutineUnintercepted
的代码调用时,发现会跳转到IntrinsicsKt.class
文件,这个文件里面找不到方法的源代码,最后找到了IntrinsicsJvm.kt
文件,找到createCoroutineUnintercepted
方法源码,如下:
# R:CoroutineScope
# T:Unit
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
// probeCoroutineCreated方法直接返回completion
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
这里会判断this
的类型是否为BaseContinuationImpl
,this
就是我们之前在launch
中传入的lambda
块,那么这个lambda
代码块是什么类型的呢?想要知道这个答案,我们得对这一节刚开始给出的代码进行反编译
kotlin
代码:
class Temp2 {
fun execute() {
GlobalScope.launch(Dispatchers.Main) {
Temp().fetchData("argument")
}
}
}
对反编译后的java
代码进行适当的重命名和调整,得出:
public final class Temp2 {
...
static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
LaunchLambda(Continuation $completion) {
super(2, $completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super LaunchLambda> $completion) {
return (Continuation<Unit>) new LaunchLambda($completion);
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
return ((LaunchLambda) create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}
}
可以看出在Temp2
里面会自动生成一个静态内部类LaunchLambda
,它对应着launch
方法中传入的lambda
块。LaunchLambda
的继承关系(由上到下,子类到父类的顺序):
LaunchLambda
-> SuspendLambda // 用suspend修饰的lambda块都会继承至这个类
-> ContinuationImpl
-> BaseContinuationImpl // 重写了resumeWith函数
-> Continuation
OK,回到createCoroutineUnintercepted
方法中,现在可以回答刚刚提出的问题了,lambda
传入的lambda
块是不是BaseContinuationImpl
类型呢?根据上面的继承关系得出,当然是!那么它就会调用LaunchLambda
的create
方法,注意第二个参数传入的是completion
(代码中写的是probeCompletion
),它最终会被保存在父类BaseContinuationImpl
的completion
变量中,这个completion
参数就是launch
方法中创建的StandaloneCoroutine
,即协程本身,它作为协程恢复时的最顶层续体。
通过调用create
方法获取到一个LaunchLambda
实例,createCoroutineUnintercepted
方法执行结束并返回LaunchLambda
实例,接着代码执行又回到startCoroutineCancellable
中,回顾下该方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
这里有两部分调用,先是调用intercepted
方法,然后再调用resumeCancellableWith
方法。intercepted
方法与续体拦截机制有关,后面会介绍,这里先忽略,这里直接认为调用了LaunchLambda
实例的resumeCancellableWith
方法即可,该方法如下:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
那么会走到resumeWith
方法,前面提到过该方法在父类BaseContinuationImpl
实现,在该方法里面会调用invokeSuspend
方法,invokeSuspend
方法在LaunchLambda
中实现了,如下:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
一开始label
的值为0
,所以会进入case 0
语句分支,在该语句分支里面,会设置label
的值为1
,然后创建一个Temp
对象并且调用它的fetchData
方法,并把LaunchLambda
自身作为参数传入,也就是LaunchLambda
实例会被保存在fetchData
方法创建的续体的completion
变量里,方便协程恢复的时候进行回调。
现在续体的持有图:
到了这里,从启动一个协程到协程最终是如何挂起的,我们已经可以串联起来了。在「如何恢复?」一节中,协程恢复的最后几个步骤我们还没有分析,这里把它分析完,然后整个协程恢复的流程也可以串起来了。
协程恢复的后续流程:
- 当
FetchDataStateMachine::invokeSuspend
执行完后,会在BaseContinuationImpl
的resumeWith
方法中判断FetchDataStateMachine
所持有的completion
(即LaunchLambda
)是否为BaseContinuationImpl
类型,由LaunchLambda
的继承关系,容易得出答案为「是」,所以会进入下一轮while
循环,调用LaunchLambda
的invokeSuspend
方法。 - 由于
label = 1
所以会进入case 1
语句,里面直接return Unit
。接着判断LaunchLambda
持有的completion
(即StandaloneCoroutine
)是否为BaseContinuationImpl
类型,根据StandaloneCoroutine
的继承关系容易得出答案为「不是」,所以会调用StandaloneCoroutine
的resumeWith
方法。 StandaloneCoroutine
的resumeWith
方法在父类AbstractCoroutine
中实现:
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
// 如果在等子协程完成,则返回
if (state === COMPLETING_WAITING_CHILDREN) return
// 应该是做一些后续处理
afterResume(state)
}
此时最顶层的续体(协程自身)也恢复了。
BaseContinuationImpl::resumeWith
方法执行结束,整个协程的恢复也完成了。
在之前流程图的基础上进行补充完善:
一、协程至上而下调用的流程图(协程挂起)
其中蓝色的文本和线条表示新增的,红色的文本和线条表示挂起的过程。
二、协程至下而上恢复的流程图(协程恢复)
其中蓝色的文本和线条表示新增的,橙色的文本和线条表示方法调用的结束。
协程上下文
协程上下文CoroutineContext
定义了协程的行为,它记录了当前协程所持有的信息,是协程运行中一个重要的数据对象。CoroutineContext
是一个接口:
public interface CoroutineContext {...}
在续体中就有CoroutineContext
的相关信息:
public interface Continuation<in T> {
// 与该续体对应的协程的上下文
public val context: CoroutineContext
// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
public fun resumeWith(result: Result<T>)
}
下面几种元素都是「协程上下文」的元素:
Job
:控制协程的生命周期。CoroutineDispatcher
:将工作分派到适当的线程。CoroutineName
:协程的名称,可用于调试。CoroutineExceptionHandler
:处理未捕获的异常。
CoroutineContext
可以看做是CoroutineContext.Element
的一个集合,集合中的每个元素都可以使用CoroutineContext.Key
进行定位,且每个元素的Key
都是不同的。
CoroutineContext.Element
的定义:
public interface Element : CoroutineContext {...}
可以看到Element
本身也实现了CoroutineContext
接口,这很奇怪,看上去好像是Int
实现了List<Int>
接口一样,为什么元素本身也是集合了呢?其实这主要是为了方便API的设计,这样的话,一个元素比如Job
也可以直接作为一个CoroutineContext
,而不需要创建一个只包含一个元素的List
,多个元素之间也可以通过「+」进行拼接,如:
scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}
这里的「+」其实是操作符重载,对应CoroutineContext
声明的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
「协程上下文」存储元素的方式比较巧妙,它内部并不是创建一个集合,集合的每个位置都存放一个元素。它借助了一个CombinedContext
结构来实现数据的存取,CombinedContext
的定义及get
方法:
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur.left
if (next is CombinedContext) {
cur = next
} else {
return next[key]
}
}
}
...
}
从构造函数中可以看出它包含两部分内容:left
和element
。也就是说一个CombinedContext
内部可能包含多个元素。
- left:可能是普通的上下文元素(
CoroutineContext.Element
),也可能又是一个CombinedContext
(又包含多个上下文元素)。 - element:一个协程上下文元素。
在CombinedContext
的get
方法中,有一个while(true)
循环,执行过程如下:
- 它会先判断当前
element
元素与传入的key
是否相符,是的话直接返回该元素,否则获取到left
部分。 - 若
left
是CombinedContext
部分,则对left
变量重复步骤1。 - 若
left
不是CombinedContext
部分,则直接调用它的get
方法获取元素(获取不到则返回null
)。
另外,也可以看出element
先于left
被访问,所以越靠右边的上下文元素,其优先级越高。
Key
用于标识协程上下文元素,看看它的定义:
public interface CoroutineContext {
...
public interface Key<E : Element>
public interface Element : CoroutineContext {
// 用于标识元素的Key
public val key: Key<*>
...
}
}
CoroutineContext.Element
有个抽象类实现,可以让我们更方便地实现上下文元素:
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
以CoroutineName
为例,分析如何实现一个协程上下文元素:
public data class CoroutineName(
val name: String
/* CoroutineName.Key可以简写为CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
...
}
首先声明一点,传入父类AbstractCoroutineContextElement
的参数是CoroutineName.Key
,只是它可以简写为CoroutineName
。其实这也很好理解,在Kotlin
中,我们调用伴生对象方法的时候,是可以省去伴生对象的类名的,这里也是同样的道理。
CoroutineName
内部声明了一个继承至CoroutineContext.Key
的伴生对象Key
,并将其作为构造参数传入父类AbstractCoroutineContextElement
中,以此作为该协程上下文元素的Key
。
上面是实现协程上下文元素的一种普遍做法,即在协程上下文元素里面定义一个伴生对象,以伴生对象为Key
,标识该上下文元素。
最后再看一下CoroutineContext
的完整定义:
public interface CoroutineContext {
// 根据key获取元素
public operator fun <E : Element> get(key: Key<E>): E?
// 翻译为"折叠",它与上下文元素的累加有关
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 协程上下文元素的累加
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
// 当前CoroutineContext中,去掉key标识的元素后,剩下的上下文元素(以CoroutineContext形式返回)
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
// 标识上下文元素的Key
public val key: Key<*>
// key相同则返回元素自身,否则返回null
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
// 执行传入的operation函数
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
CoroutineContext
的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
为了方便后面阐述,记调用形式为A + B,假设A是含有多个元素的协程上下文,B是单个上下文元素。该方法的大致执行流程如下:
- 若元素B是空的,则返回原来的上下文A。
- 在fold的lambda块中,可以认为acc为A,element为B。
- 若A中减去element.key元素后(记为C),C为空上下文,则返回B(相当于元素B替换了上下文A)。
- 查看C中是否有ContinuationInterceptor元素,没有则将C和B拼接后返回。
- C中剔除ContinuationInterceptor,记为D,若D是空的,则将B和ContinuationInterceptor拼接然后返回。
- D不是空的,则将D和B和ContinuationInterceptor拼接然后返回。
简单来说,这里就是要将「传入的协程上下文元素」与「原来的协程上下文元素」进行拼接,若传入的元素与原来集合中的元素的key
有冲突,则用传入的元素替换掉原来集合中key
冲突的元素。在上下文元素拼接的时候,若有ContinuationInterceptor
元素则要确保它在「协程上下文元素集合」的最右边,这样它的优先级最高,从协程上下文获取该元素的时候可以更快地获取到(至于为什么元素在右边,元素的优先级就高、获取快,在前面介绍CombinedContext
中已经说明过了)。
plus
方法的执行流程很难用文字叙述清楚,如果想要知道它的实现流程,可以代入几个例子试试。但是它具体的执行流程并不是要分析的重点,有个大概的印象即可。
续体拦截机制
这里算是协程实现原理解析的最后一环了。我们在使用协程的时候,会使用到一些调度器如Dispatchers.Main
和Dispatchers.IO
等调度器来调度线程,在前面的分析中并没有提到协程是如何进行线程调度的。
线程的调度与续体拦截器ContinuationInterceptor
有关,它也是一种「协程上下文元素」:
public interface ContinuationInterceptor : CoroutineContext.Element {
// 续体拦截器对应的Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 返回一个续体,该续体对原始的续体进行包装(原始的续体作为方法参数传入)。
// 如果该方法不想拦截传入的续体,也可以直接返回原来的续体。
// 当原始续体完成时,如果该续体之前被拦截了,协程框架会调用releaseInterceptedContinuation
// 方法,传入的参数就是「续体的包装类」。
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 该函数只有在interceptContinuation成功拦截的情况下,才会被调用。
// 若原始续体成功被拦截,当原始续体完成且不再被使用时,该方法会被调用,传入的参数是「续体的包装类」。
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
...
}
续体拦截器可以用于拦截一个续体,最常见的续体拦截器就是协程调度器CoroutineDispatcher
,可以通过单例类Dispatchers
获取到相应的协程调度器。查看CoroutineDispatcher
的实现:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
...
}
- 拦截器:
CoroutineDispatcher
继承至ContinuationInterceptor
,所以它也是一种续体拦截器。 - 上下文元素的标识:
CoroutineDispatcher
继承至AbstractCoroutineContextElement
,并传入ContinuationInterceptor.Key
构造参数,以此来标识自身。 - isDispatchNeeded:若需要使用
dispatch
方法进行调度则返回true
,否则返回false
。该方法默认返回true
。协程调度器可以重写该方法,提供一个性能优化以避免不必要的dispatch
,例如主线程调度器Dispatchers.Main
会判断当前协程是否已经在UI
线程中,如果是的话该方法就会返回false
,没有必要再去执行dispatch
方法进行不必要的线程调度。 - dispatch:在给定的上下文和线程中,去执行
block
块。
假设使用的协程调度器是主线程调度器Dispatchers.Main
:
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
查看MainDispatcherLoader.dispatcher
:
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
调用了tryCreateDispatcher
:
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
继续跟踪,发现createDispatcher
是MainDispatcherFactory
接口的一个方法,其中的一个实现在AndroidDispatcherFactory
中:
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
...
}
HandlerContext
其实就是调度器Dispatchers.Main
的最终实现:
# handler:主线程的Handler
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
...
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
-
isDispatchNeeded:通过
looper
判断协程当前是否在主线程上,是的话返回false
,表示不需要再进行线程调度,否则返回true
表示需要进行线程调度。 -
dispatch:使用主线程的
handler
对传入的block
块进行post
操作。
对「续体拦截器」「协程调度器」有了一定的了解之后,我们再回过头看一下协程调度器是如何发挥作用的。我们前面分析过Cancellable
文件的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
在createCoroutineUnintercepted
方法中返回了LaunchLambda
实例,在之前的分析中,我们忽略了intercepted
方法,直接分析为LaunchLambda
会调用resumeCancellableWith
方法,若没有为协程设定续体拦截器,那么确实是LaunchLambda
会直接调用到resumeCancellableWith
方法。我们看看,如果为协程设定了续体拦截器,会发生什么?
查看LaunchLambda
调用的intercepted
方法,它在IntrinsicsJVM
文件中:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
LaunchLambda
是ContinuationImpl
类型,因此会调用到父类ContinuationImpl::intercepted
:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
...
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
刚开始intercepted
为null
,所以会判断协程上下文中是否有ContinuationInterceptor
元素,若没有则会返回this
(即LaunchLambda
自身,并将intercepted
变量设置为LaunchLambda
),有的话则会调用interceptContinuation
方法,假设使用的续体拦截器是Dispatchers.Main
,那么就是调用到CoroutineDispatcher
的interceptContinuation
方法,该方法会返回一个DispatchedContinuation
(并将DispatchedContinuation
设置到intercepted
变量中)。
查看CoroutineDispatcher::interceptContinuation
:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
DispatchedContinuation
类:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {...}
在这里的例子中,dispatcher
就是Dispatchers.Main
,continuation
就是LaunchLambda
。
再回到Cancellable
文件的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
在有续体拦截器(Dispatchers.Main
)的情况下,intercepted
方法会返回DispatchedContinuation
,接着调用它的resumeCancellableWith
方法:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
调用到另外一个resumeCancellableWith
方法,这个方法就是在DispatchedContinuation
中实现的了:
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // 需要线程调度
_state = state
resumeMode = MODE_CANCELLABLE
// 线程调度,将自身以Runnable块形式传入
dispatcher.dispatch(context, this)
} else { // 不需要线程调度
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
// 最终会调用continuation.resumeWith,即LaunchLambda.resumeWith
resumeUndispatchedWith(result)
}
}
}
}
可以看到,它调用了dispatcher.isDispatchNeeded
来判断是否需要进行线程调度,以Dispatchers.Main
为例,就是判断当前协程是否在主线程中运行,是的话则不需要调度,否则需要将协程调度到主线程中运行。
- 不需线程调度:最终会调用到
LaunchLambda.resumeWith
,它后续的执行流程之前已经分析过了。 - 需要线程调度:(以主线程的协程调度器为例)最终会将传入的
Runnable
在主线程中执行。
Runnable
的run
方法在哪实现的呢?在DispatchedContinuation
的父类DispatchedTask
中有run
方法的实现:
public final override fun run() {
...
try {
// 获取到的delegate其实就是DispatchedContinuation
val delegate = delegate as DispatchedContinuation<T>
// 获取到的continuation其实就是LaunchLambda
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
// 正常情况下,会执行到这里,调用LaunchLambda的resume方法
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
...
} finally {
...
}
}
在run
方法中,最终会调用到LaunchLambda
的resume
方法(内部又会调用到resumeWith
方法)。所以这里做的线程调度,其实就是通过主线程的handler
,将代码post
到主线程中去运行,从而完成线程的调度工作。
另外,还有几个未研究的地方与自己的猜想:
一、releaseIntercepted方法:在BaseContinuationImpl::resumeWith
中,每执行完一个续体的invokeSuspend
方法,就会调用该续体的releaseIntercepted
方法
protected override fun releaseIntercepted() {
val intercepted = intercepted
// intercepted不为null且不为自身(即之前成功拦截续体),就进入If块
if (intercepted != null && intercepted !== this) {
// 调用续体拦截器的releaseInterceptedContinuation方法,并传入续体包装类
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
intercepted)
}
// 将intercepted变量设置为CompletedContinuation
this.intercepted = CompletedContinuation // just in case
}
续体拦截器的releaseInterceptedContinuation
方法应该是做一些资源清理的工作。
二、像withContext
这样的函数:
scope.launch(Dispatchers.Main) {
withContext(Dispatchers.IO) {}
}
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {...}
在block
块执行完后,会将线程自动切回「启动协程时的协程调度器所指定」的线程,那么它是如何切回来的呢?个人猜测,在协程至上而下调用的时候,协程上下文会一层一层地向下传递,withContext
的block
块执行的时候,协程上下文会被保存在某个地方,等到block
块执行结束的时候,会从之前保存的协程上下文中取出协程调度器,将剩余的代码(协程恢复)调度到相应的线程中去执行,从而实现了 block
块执行完后,线程会自动切回「启动协程时的协程调度器所指定」的线程。
参考
标签:completion,协程,Kotlin,result,Continuation,原理,continuation,public 来源: https://www.cnblogs.com/giagor/p/15823857.html