Kotlin select使用方法介绍

作者:且听真言 时间:2022-05-28 19:34:27 

一、select是什么

select——>用于选择更快的结果。

基于场景理解

比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。

如何实现上述逻辑:

runBlocking {
       suspend fun getCacheInfo(productId: String): Product {
           delay(100L)
           return Product(productId, 8.9)
       }
       suspend fun getNetworkInfo(productId: String): Product? {
           delay(200L)
           return Product(productId, 8.8)
       }
       fun updateUI(product: Product) {
           println("${product.productId} : ${product.price}")
       }
       val startTime = System.currentTimeMillis()
       val productId = "001"
       val cacheInfo = getCacheInfo(productId)
       if (cacheInfo != null) {
           updateUI(cacheInfo)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
       val latestInfo = getNetworkInfo(productId)
       if (latestInfo != null) {
           updateUI(latestInfo)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
   }

001 : 8.9
Time cost: 113
001 : 8.8
Time cost: 324

上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。

用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。

runBlocking {
       suspend fun getCacheInfo(productId: String): Product {
           delay(100L)
           return Product(productId, 8.9)
       }
       suspend fun getNetworkInfo(productId: String): Product {
           delay(200L)
           return Product(productId, 8.8)
       }
       fun updateUI(product: Product) {
           println("${product.productId} : ${product.price}")
       }
       val startTime = System.currentTimeMillis()
       val productId = "001"
       val product = select<Product?> {
           async {
               getCacheInfo(productId)
           }.onAwait {
               it
           }
           async {
               getNetworkInfo(productId)
           }.onAwait {
               it
           }
       }
       if (product != null) {
           updateUI(product)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
   }

001 : 8.9
Time cost: 134
 
Process finished with exit code 0

由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。

在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。

runBlocking {
       suspend fun getCacheInfo(productId: String): Product {
           delay(100L)
           return Product(productId, 8.9)
       }
       suspend fun getNetworkInfo(productId: String): Product {
           delay(200L)
           return Product(productId, 8.8)
       }
       fun updateUI(product: Product) {
           println("${product.productId} : ${product.price}")
       }
       val startTime = System.currentTimeMillis()
       val productId = "001"
       val cacheDeferred = async {
           getCacheInfo(productId)
       }
       val latestDeferred = async {
           getNetworkInfo(productId)
       }
       val product = select<Product?> {

cacheDeferred.onAwait {
               it.copy(isCache = true)
           }
           latestDeferred.onAwait {
               it.copy(isCache = false)
           }
       }
       if (product != null) {
           updateUI(product)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
       if (product != null && product.isCache) {
           val latest = latestDeferred.await() ?: return@runBlocking
           updateUI(latest)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
   }

001 : 8.9
Time cost: 124
001 : 8.8
Time cost: 228
 
Process finished with exit code 0

如果是多个服务的缓存场景呢?

runBlocking {
       val startTime = System.currentTimeMillis()
       val productId = "001"
       suspend fun getCacheInfo(productId: String): Product {
           delay(100L)
           return Product(productId, 8.9)
       }
       suspend fun getCacheInfo2(productId: String): Product {
           delay(50L)
           return Product(productId, 8.85)
       }
       suspend fun getNetworkInfo(productId: String): Product {
           delay(200L)
           return Product(productId, 8.8)
       }
       fun updateUI(product: Product) {
           println("${product.productId} : ${product.price}")
       }
       val cacheDeferred = async {
           getCacheInfo(productId)
       }
       val cacheDeferred2 = async {
           getCacheInfo2(productId)
       }
       val latestDeferred = async {
           getNetworkInfo(productId)
       }
       val product = select<Product?> {
           cacheDeferred.onAwait {
               it.copy(isCache = true)
           }
           cacheDeferred2.onAwait {
               it.copy(isCache = true)
           }
           latestDeferred.onAwait {
               it.copy(isCache = true)
           }
       }
       if (product != null) {
           updateUI(product)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
       if (product != null && product.isCache) {
           val latest = latestDeferred.await()
           updateUI(latest)
           println("Time cost: ${System.currentTimeMillis() - startTime}")
       }
   }

Log
 
001 : 8.85
Time cost: 79
001 : 8.8
Time cost: 229
 
Process finished with exit code 0

select 代码模式,可以提升程序的整体响应速度。

二、select和Channel

runBlocking {
       val startTime = System.currentTimeMillis()
       val channel1 = produce {
           send(1)
           delay(200L)
           send(2)
           delay(200L)
           send(3)
       }
       val channel2 = produce {
           delay(100L)
           send("a")
           delay(200L)
           send("b")
           delay(200L)
           send("c")
       }
       channel1.consumeEach {
           println(it)
       }
       channel2.consumeEach {
           println(it)
       }
       println("Time cost: ${System.currentTimeMillis() - startTime}")
   }

Log
 
1
2
3
a
b
c
Time cost: 853
 
Process finished with exit code 0

上述代码串行执行,可以使用select进行优化。

runBlocking {
       val startTime = System.currentTimeMillis()
       val channel1 = produce {
           send(1)
           delay(200L)
           send(2)
           delay(200L)
           send(3)
       }
       val channel2 = produce {
           delay(100L)
           send("a")
           delay(200L)
           send("b")
           delay(200L)
           send("c")
       }
       suspend fun selectChannel(
           channel1: ReceiveChannel<Int>,
           channel2: ReceiveChannel<String>
       ): Any {
           return select<Any> {
               if (!channel1.isClosedForReceive) {
                   channel1.onReceive {
                       it.also {
                           println(it)
                       }
                   }
               }
               if (!channel2.isClosedForReceive) {
                   channel2.onReceive {
                       it.also {
                           println(it)
                       }
                   }
               }
           }
       }
       repeat(6) {
           selectChannel(channel1, channel2)
       }
       println("Time cost: ${System.currentTimeMillis() - startTime}")
   }

Log
1
a
2
b
3
c
Time cost: 574
 
Process finished with exit code 0

从代码执行结果可以发现程序的执行耗时有效减少。onReceive{} 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。

如果Channel1不生产数据了,程序会如何执行?

runBlocking {
       val startTime = System.currentTimeMillis()
       val channel1 = produce<String> {
           delay(5000L)
       }
       val channel2 = produce<String> {
           delay(100L)
           send("a")
           delay(200L)
           send("b")
           delay(200L)
           send("c")
       }
       suspend fun selectChannel(
           channel1: ReceiveChannel<String>,
           channel2: ReceiveChannel<String>
       ): String = select<String> {
           channel1.onReceive {
               it.also {
                   println(it)
               }
           }
           channel2.onReceive {
               it.also {
                   println(it)
               }
           }
       }
       repeat(3) {
           selectChannel(channel1, channel2)
       }
       println("Time cost: ${System.currentTimeMillis() - startTime}")
   }

Log
a
b
c
Time cost: 570
 
Process finished with exit code 0

如果不知道Channel的个数,如何避免ClosedReceiveChannelException?

使用:onReceiveCatching{}

runBlocking {
       val startTime = System.currentTimeMillis()
       val channel1 = produce<String> {
           delay(5000L)
       }
       val channel2 = produce<String> {
           delay(100L)
           send("a")
           delay(200L)
           send("b")
           delay(200L)
           send("c")
       }
       suspend fun selectChannel(
           channel1: ReceiveChannel<String>,
           channel2: ReceiveChannel<String>
       ): String = select<String> {
           channel1.onReceiveCatching {
               it.getOrNull() ?: "channel1 is closed!"
           }
           channel2.onReceiveCatching {
               it.getOrNull() ?: "channel2 is closed!"
           }
       }
       repeat(6) {
           val result = selectChannel(channel1, channel2)
           println(result)
       }
       println("Time cost: ${System.currentTimeMillis() - startTime}")
   }

Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 584
 
Process finished with exit code 0

得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。

所以我们需要在6次repeat之后将channel关闭。

runBlocking {
       val startTime = System.currentTimeMillis()
       val channel1 = produce<String> {
           delay(15000L)
       }
       val channel2 = produce<String> {
           delay(100L)
           send("a")
           delay(200L)
           send("b")
           delay(200L)
           send("c")
       }
       suspend fun selectChannel(
           channel1: ReceiveChannel<String>,
           channel2: ReceiveChannel<String>
       ): String = select<String> {
           channel1.onReceiveCatching {
               it.getOrNull() ?: "channel1 is closed!"
           }
           channel2.onReceiveCatching {
               it.getOrNull() ?: "channel2 is closed!"
           }
       }
       repeat(6) {
           val result = selectChannel(channel1, channel2)
           println(result)
       }
       channel1.cancel()
       channel2.cancel()
       println("Time cost: ${System.currentTimeMillis() - startTime}")
   }

Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 612
 
Process finished with exit code 0

Deferred、Channel 的 API:

public interface Deferred : CoroutineContext.Element {
   public suspend fun join()
   public suspend fun await(): T
   public val onJoin: SelectClause0
   public val onAwait: SelectClause1<T>
}
public interface SendChannel<in E>
   public suspend fun send(element: E)
   public val onSend: SelectClause2<E, SendChannel<E>>
}
public interface ReceiveChannel<out E> {
   public suspend fun receive(): E
   public suspend fun receiveCatching(): ChannelResult<E>
   public val onReceive: SelectClause1<E>
   public val onReceiveCatching: SelectClause1<ChannelResult<E>>
}

当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。

通过 async 并发执行协程,也可以借助 select 得到最快的结果。

runBlocking {
       suspend fun <T> fastest(vararg deferreds: Deferred<T>): T = select {
           fun cancelAll() = deferreds.forEach {
               it.cancel()
           }
           for (deferred in deferreds) {
               deferred.onAwait {
                   cancelAll()
                   it
               }
           }
       }
       val deferred1 = async {
           delay(100L)
           println("done1")
           "result1"
       }
       val deferred2 = async {
           delay(200L)
           println("done2")
           "result2"
       }
       val deferred3 = async {
           delay(300L)
           println("done3")
           "result3"
       }
       val deferred4 = async {
           delay(400L)
           println("done4")
           "result4"
       }
       val deferred5 = async {
           delay(5000L)
           println("done5")
           "result5"
       }
       val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
       println(fastest)
   }

Log
 
done1
result1
 
Process finished with exit code 0

来源:https://blog.csdn.net/zhangying1994/article/details/127485681

标签:Kotlin,select
0
投稿

猜你喜欢

  • 浅析MMAP零拷贝在RocketMQ中的运用

    2021-11-21 01:59:47
  • 谷歌被屏蔽后如何搭建安卓环境

    2022-10-23 02:07:36
  • java中String的一些方法深入解析

    2023-11-25 21:48:56
  • Java接口的作用_动力节点Java学院整理

    2021-12-23 20:46:14
  • c# Struct的一些问题分析

    2023-08-31 08:27:36
  • java 创建线程的方法总结

    2023-02-25 20:20:30
  • 横竖屏切换导致页面频繁重启screenLayout解析

    2021-06-14 05:16:22
  • Java技巧函数方法实现二维数组遍历

    2023-09-12 23:25:00
  • Java中mybatis的三种分页方式

    2021-06-25 11:14:05
  • Java设计模式之备忘录模式

    2023-08-24 06:17:05
  • Java实现FTP上传与下载功能

    2021-09-22 18:28:51
  • JFileChooser实现对选定文件夹内图片自动播放和暂停播放实例代码

    2021-10-02 15:41:18
  • VsCode搭建Java开发环境的方法

    2023-06-17 13:29:19
  • Android控件PopupWindow模仿ios底部弹窗

    2023-03-09 20:42:10
  • Java开发之内部类对象的创建及hook机制分析

    2023-11-27 04:45:20
  • SpringAOP+RabbitMQ+WebSocket实战详解

    2023-01-01 18:40:25
  • C#调用摄像头实现拍照功能的示例代码

    2023-02-14 16:21:41
  • Java实战之飞翔的小鸟小游戏

    2022-10-04 20:37:20
  • C#用链式方法表达循环嵌套

    2023-04-14 06:54:23
  • Android手机获取IP地址的两种方法

    2021-05-28 14:18:51
  • asp之家 软件编程 m.aspxhome.com