Kotlin协程Channel源码示例浅析

作者:wayne214 时间:2023-06-14 22:54:08 

结论先行

Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。

Channel使用示例

fun main() = runBlocking {
   logX("开始")
   val channel = Channel<Int> {  }
   launch {
       (1..3).forEach{
           channel.send(it)
           logX("发送数据: $it")
       }
       // 关闭channel, 节省资源
       channel.close()
   }
   launch {
       for (i in channel){
           logX("接收数据: $i")
       }
   }
   logX("结束")
}

示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。

Channel的源码

public fun <E> Channel(
   capacity: Int = RENDEZVOUS,
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
   onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
   when (capacity) {
       RENDEZVOUS -> {}
       CONFLATED -> {}
       UNLIMITED -> {}
       else -> {}
   }

可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,没有限量

  • CONFLATED: 容量为1,新的覆盖旧的值

  • BUFFERED: 添加缓冲容量,默认值是64,可以通过修改VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修改

接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是: SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
   /**
    * Suspend on buffer overflow.
    */
   SUSPEND,
   /**
    * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
    */
   DROP_OLDEST,
   /**
    * Drop **the latest** value that is being added to the buffer right now on buffer overflow
    * (so that buffer contents stay the same), do not suspend.
    */
   DROP_LATEST
}
  • SUSPEND,当管道的容量满了以后,如果发送方还要继续发送,我们就会挂起当前的 send() 方法。由于它是一个挂起函数,所以我们可以以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复,有点像生产者-消费者模型

  • DROP_OLDEST,顾名思义,就是丢弃最旧的那条数据,然后发送新的数据,有点像LRU算法。

  • DROP_LATEST,丢弃最新的那条数据。这里要注意,这个动作的含义是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变。

最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。

综合这个参数使用一下

fun main() = runBlocking {
   println("开始")
   val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
       println("onUndeliveredElement = $it")
   }
   launch {
       (1..3).forEach{
           channel.send(it)
           println("发送数据: $it")
       }
       // 关闭channel, 节省资源
       channel.close()
   }
   launch {
       for (i in channel){
           println("接收数据: $i")
       }
   }
   println("结束")
}
输出结果如下:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3
接收数据: 2
接收数据: 3

安全的从Channel中取数据

先看一个例子

val channel: ReceiveChannel<Int> = produce {
       (1..100).forEach{
           send(it)
           println("发送: $it")
       }
   }
while (!channel.isClosedForReceive){
   val i = channel.receive();
   println("接收: $i")
}
输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。

推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码

val channel: ReceiveChannel<Int> = produce {
       (1..100).forEach{
           send(it)
           println("发送: $it")
       }
   }
channel.consumeEach {
   println("接收:$it")
}

所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。

&ldquo;热的数据流&rdquo;从何而来?

先看一下代码

println("开始")
   val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
       println("onUndeliveredElement = $it")
   }
   launch {
       (1..3).forEach{
           channel.send(it)
           println("发送数据: $it")
       }
   }
   println("结束")
}
输出:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3

可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种&ldquo;不管有没有接收方,发送方都会工作&rdquo;的模式,就是我们将其认定为&ldquo;热&rdquo;的原因。

举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。

总的来说,不管接收方是否存在,Channel 的发送方一定会工作。

Channel能力的来源

通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。

这也是一种良好的设计思想,&ldquo;对读取开放,对写入封闭&rdquo;的开闭原则。

来源:https://juejin.cn/post/7169078590640750599

标签:Kotlin,Channel,协程
0
投稿

猜你喜欢

  • TKmybatis的框架介绍和原理解析

    2022-08-28 21:35:42
  • SpringBoot下载Excel文件时,报错文件损坏的解决方案

    2023-01-09 15:49:16
  • 详解OpenCV For Java环境搭建与功能演示

    2023-05-27 09:13:50
  • C++编写DLL动态链接库的步骤与实现方法

    2023-01-30 12:59:33
  • Unity实现全屏截图以及QQ截图

    2023-07-25 17:12:22
  • Android仿今日头条APP实现下拉导航选择菜单效果

    2023-09-15 07:21:05
  • C#操作XML方法详解

    2022-11-10 21:21:14
  • Java 判断字符串中是否包含中文的实例详解

    2023-11-06 13:17:18
  • 详解基于java的Socket聊天程序——初始设计(附demo)

    2023-02-01 06:46:21
  • springmvc+shiro自定义过滤器的实现代码

    2021-08-11 21:23:11
  • Android so的热升级尝试

    2023-08-07 22:41:59
  • c# rsa加密解密详解

    2023-06-11 00:54:17
  • Android编程之菜单实现方法

    2023-12-21 16:41:24
  • Maven的安装配置详解

    2023-11-24 08:52:05
  • JavaWeb使用Session和Cookie实现登录认证

    2023-12-11 19:13:29
  • 多线程(多窗口卖票实例讲解)

    2021-09-02 02:47:26
  • MyBatis 动态拼接Sql字符串的问题

    2021-08-09 05:23:46
  • Java 多线程并发编程提高数据处理效率的详细过程

    2021-06-29 04:19:39
  • Java读取txt文件中的数据赋给String变量方法

    2022-08-04 22:32:19
  • javaweb实战之商城项目开发(三)

    2023-04-15 14:27:34
  • asp之家 软件编程 m.aspxhome.com