继续来,同我一起撸Kotlin Channel 深水区
作者:互联网
1. Channel的引入及简单使用
初级版协程间通信
先来看一个简单的通信Demo:
fun testChannel() { //协程1 var deferred= GlobalScope.async { //假装在加工数据 Thread.sleep(2000) "Hello fishforest" } //协程2 GlobalScope.launch { var result = deferred.await() println("get result from coroutine1: $result") } }
如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。 现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:
共享一个变量,这个变量可以是个队列。
于是Demo改造如下:
fun testChannel2() { //阻塞队列 var queue = ArrayBlockingQueue<String>(5) //协程1 GlobalScope.launch { var count = 0 while (true) { //假装在加工数据 Thread.sleep(1000) queue.put("fish ${ count++}") } } //协程2 GlobalScope.launch { while (true) { Thread.sleep(1000) println("get result from coroutine1:${ queue.take()}") } } }
通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。 看似美好,实际上此处有个很大的漏洞:
队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。
我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。
升级版协程间通信-Channel
同样的需求,我们用Channel 实现:
fun testChannel3() { //定义Channel var channel = Channel<String>() //协程1 GlobalScope.launch { var count = 0 while (true) { //假装在加工数据 Thread.sleep(1000) var sendStr = "fish ${ count++}" println("send $sendStr") channel.send("$sendStr") } } //协程2 GlobalScope.launch { while (true) { Thread.sleep(1000) println("receive:${ channel.receive()}") } } }
与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。 查看打印结果: 你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列? 要想解开这个谜题,最好的方法是从源码入手深究其原理。
2. Channel的原理
Channel的构造
先从Channel 构造开始:
#Channel.kt public fun <E> Channel( //Channel 容量/叫做Channel类型更合理一些 capacity: Int = Channel.RENDEZVOUS, //缓冲区满后,发送端的处理方式 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, //信息没有传递出去时的回调 onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> = when (capacity) { //默认是约会模式 Channel.RENDEZVOUS -> { //默认挂起 if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel } //... }
此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。 RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。
重点在AbstractChannel/AbstractSendChannel及其子类里。