其他分享
首页 > 其他分享> > Kotin 协程从零到一《五》

Kotin 协程从零到一《五》

作者:互联网

上一篇文章介紹了协程的运行与恢复,本篇介绍父子协程

什么是子协程

我们这里举一个例子
在这里插入图片描述
第一个launch 创建的协程代表的就死父协程,第二个launch创建的协程代表的就是子协程,注意第一个launch调用的是this对象方法。
在这里插入图片描述
如上第二个launch调用的就是StandaloneCoroutine的launch方法,StandaloneCoroutine表示一个新建的协程,其本身也实现了CoroutineScope。子协程还可以有自己的子协程,最终形成了一个协程树。
在这里插入图片描述

父子协程的关系

1父子协程的执行顺序

     fun testChild() {
         var con= GlobalScope.launch {
              println("我是父协程》》开始")
              launch {
                  println("我是子协程")
              }
              //打印完这句话之后,父协程可能还没有执行完
              println("我是父协程》》结束")
          }
         con.invokeOnCompletion {
             println("协程结束")
         }
    }

这里还是以上面的代码为例子,你能看出来所有的打印的顺序吗。
首先 println(“我是子协程”) 以及println(“我是父协程》》结束”) 这两个打印完全取决于线程的调度,因此谁先打印都有可能,这里假设先执行了 println(“我是父协程》》结束”),但是打印完这句话之后,父协程的逻辑此时还没有执行完实际它会等待所有的子协程的代码全部执行完成之后父协程才会结束,此时父协程才会调用invokeOnCompletion 注册的监听。

协程为什么这么设计呢?
这里主要考虑的异常处理的问题,例如子协程运行的过程中抛出了一个子协程未处理的异常,但是父协程与子协程可能是运行在两个不一样的线程里面,所以此时再父协程里面try catch 子协程的异常就毫无意义,因此父协程必须等待子协程执行完之后告诉自己有没有异常,收到这个异常之后,父协程再来决定是不是处理这个异常,加入父协程不处理这个异常,那么可能会将这个异常抛给父协程的父协程。

 public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
 
public typealias CompletionHandler = (cause: Throwable?) -> Unit

CompletionHandler 的参数Throwable,这个Throwable 就可能是父协程自己执行的时候抛出的异常也有可能是子协程传递给父协程的一个异常。

关于这一部分,会在下一篇文章结合源码给大家解释。

2协程的取消

协程启动后可以在内部在启动一个子协程,当存在这样的结构化层级时父协程的取消会引起子协程立即跟随取消,协程启动后返回的Job对象时可以监听完成和取消事件。

父子关系的建立

Kotlin子协程并没有直接持有父协程的引用,而是类似父协程在子协程里面注册了一个监听者。我们结合源码理解。

在这里插入图片描述

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
    internal fun initParentJob() {
        initParentJobInternal(parentContext[Job])
    }

parentContex 值得是父协程的context,也就是下图的context
在这里插入图片描述

    internal fun initParentJobInternal(parent: Job?) {
        check(parentHandle == null)
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        //确保父协程已经启动了
        parent.start() // make sure the parent is started
        @Suppress("DEPRECATION")
        //核心方法,建立父子关系
        val handle = parent.attachChild(this)
        parentHandle = handle
        // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
    }

parent 实际类型是JobSupport

    @Suppress("OverridingDeprecatedMember")
    public final override fun attachChild(child: ChildJob): ChildHandle {
      
        return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
    }

可以看到实际这里是通过invokeOnCompletion 建立的父子关系,

    public final override fun invokeOnCompletion(
        onCancelling: Boolean,
        invokeImmediately: Boolean,
        handler: CompletionHandler
    ): DisposableHandle {
        var nodeCache: JobNode<*>? = null
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try move to SINGLE state
                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (_state.compareAndSet(state, node)) return node
                    } else
                        promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
                }
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                        promoteSingleToNodeList(state as JobNode<*>)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                        if (onCancelling && state is Finishing) {
                            synchronized(state) {
                                // check if we are installing cancellation handler on job that is being cancelled
                                rootCause = state.rootCause // != null if cancelling job
                                // We add node to the list in two cases --- either the job is not being cancelled
                                // or we are adding a child to a coroutine that is not completing yet
                                if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
                                    // Note: add node the list while holding lock on state (make sure it cannot change)
                                    val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                                    if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                                    // just return node if we don't have to invoke handler (not cancelling yet)
                                    if (rootCause == null) return node
                                    // otherwise handler is invoked immediately out of the synchronized section & handle returned
                                    handle = node
                                }
                            }
                        }
                        if (rootCause != null) {
                            // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
                            if (invokeImmediately) handler.invokeIt(rootCause)
                            return handle
                        } else {
                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                            if (addLastAtomic(state, list, node)) return node
                        }
                    }
                }
                else -> { // is complete
                    // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
                    // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
                    if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
                    return NonDisposableHandle
                }
            }
        }
    }

这里首先介绍一下loopOnState,可以看到loopOnState内部是个死循环,但是 loopOnState 是一个inline 方法,所以只需要block方法内部返回,那么loopOnState 就整个结束了。

    private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
        while (true) {
            block(state)
        }
    }

这里将重点放到block 方法

    loopOnState { state ->
    when (state) {
        is Empty -> { 
            // 第一种情形
          
        }
        is Incomplete -> {
         //第二种情形
            
        }
        else -> { // is complete
         //第三种情形
        }
    }
    }

这里分为了三种情形
1 Empty
此时表示协程的初始状态,当我们第一次调用iinvokeOnCompletion 一定会走这个分支。并且会将state设置为InvokeOnCancelling,而
InvokeOnCancelling 又继承了Incomplete ,所以第二次调用iinvokeOnCompletion 会走第二个分支。

2 Incomplete
第二次调用iinvokeOnCompletion 会走这个分支,此时会将state修改为NodeList,并且将之前添加的第一个节点加入到这个列表,后续的iinvokeOnCompletion 添加节点都会添加到这个NodeList这个列表里面

3else
此时表示整个协程已经完全结束了,所以会直接调用handler.invokeIt,实际就是调用子协程的parentCancelled,通知子协程父协程结束了,

好我们具体的分析

第一种

                    if (state.isActive) {
                         //isActive 表示协程是否已经启动,这里是true
                        // 如果协程是懒启动也就是创建的的协程是LazyStandaloneCoroutine
                        //上面的parrent.start 也会启动协程将isActive 设置为true
                        
                        //makeNode 就不贴代码了,这里就是将state修改为INComplete ,表示协程正在运行
                        //并且通过invokeOnCompletion   注册了一个节点。
                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (_state.compareAndSet(state, node)) return node
                    } else
                        promoteEmptyToNodeList(state) // 

第二种

 这种比较复杂,考虑的情况比较多。
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                       //
                        promoteSingleToNodeList(state as JobNode<*>)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                         //state is Finishing 表示协程运行结束了或者是协程取消
                        if (onCancelling && state is Finishing) {
                            synchronized(state) {
                                // rootCause 是协程的第一个异常
                                rootCause = state.rootCause // != null if cancelling job
                                 //handler.isHandlerOf<ChildHandleNode> 这是是真,所以下面的判断可以简化为
                                 //rootCause == null || !state.isCompleting,
                                 //rootCause == null 表示协程正常结束
                                 //!state.isCompleting 表示协程取消
                                if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
                                    // Note: add node the list while holding lock on state (make sure it cannot change)
                                    val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                                    if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                                    // just return node if we don't have to invoke handler (not cancelling yet)
                                    if (rootCause == null) return node
                                    // otherwise handler is invoked immediately out of the synchronized section & handle returned
                                    handle = node
                                }
                            }
                        }
                        if (rootCause != null) {
                            // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
                            //invokeImmediately 这里是false
                            if (invokeImmediately) handler.invokeIt(rootCause)
                            return handle
                        } else {
                            //正常情况下都会走这个分支,在NodeList 添加注册的节点
                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                            if (addLastAtomic(state, list, node)) return node
                        }
                    }
                }

第一种情形 list == null ,此时表示第二次调用iinvokeOnCompletion 添加节点。promoteSingleToNodeList 创建了一个list节点作为一个新的状态,然后将之前创建的第一个Node节点加入到这个list

第二种情形 list != null , 此时表示第三次以及更多次调用iinvokeOnCompletion ,此时state 为NodeList,NodeList 实现了Incomplete 接口,所在这里会进入list != null 分支。if (onCancelling && state is Finishing)此处onCancelling 为真,state is Finishing 协程状态为Finishing ,可能是协程正常结束,可能是协程取消了。
在这里插入图片描述
当协程执行完 print(“协程最后一句代码”) 就会将自身的状态设置为Finishing ,但是此时isCompleting 为true, 但是此时实际协程还没有真正执行完,假如协程还有子协程,那么父协程会等待全部的子协程执行完。
当我们调用cancel方法的时候协程也会将自身的状态设置为Finishing ,但是isCompleting 为false.实际这个时候launch方法的代码块可能还在执行,
此时虽然是Finish状态但是实际还是可以继续调用iinvokeOnCompletion 添加Node节点,因为在Fininsh状态之后并且全部的子协程也执行完之后还有一个状态,这个状态表示的协程的返回值,如果没有返回值那么就是UNIT,若是有异常这个状态保存的就是这个异常。进入这个状态此时就不可以添加子协程了。

第三种

也就是上面说的Finish之后的状态,invokeIt 就是直接通知要创建的子协程父协程已经结束或者是取消了。

在这里插入图片描述

最后 给大家推荐一篇文章也是过于父子协程的https://blog.csdn.net/qfanmingyiq/article/details/105623638 ,对于Node节点的添加写的非常详细,并且配有插图。

标签:node,协程,state,父协程,零到,Kotin,handler,null
来源: https://blog.csdn.net/qq_31469589/article/details/113271964