Kotlin协程之Flow基础原理示例解析

作者:李萧蝶 时间:2021-10-17 21:07:44 

引言

本文分析示例代码如下:

launch(Dispatchers.Main) {
   flow {
       emit(1)
       emit(2)
   }.collect {
       delay(1000)

withContext(Dispatchers.IO) {
           Log.d("liduo", "$it")
       }

Log.d("liduo", "$it")
   }
}

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector<in T> {
   // 可挂起,非线程安全
   public suspend fun emit(value: T)
}

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow<out T> {

@InternalCoroutinesApi
   public suspend fun collect(collector: FlowCollector<T>)
}

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
   collect(object : FlowCollector<T> {
       override suspend fun emit(value: T) = action(value)
   })

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
   override suspend fun collectSafely(collector: FlowCollector<T>) {
       collector.block()
   }
}

SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

// 核心方法
   @InternalCoroutinesApi
   public final override suspend fun collect(collector: FlowCollector<T>) {
       // 创建SafeCollector对象,对collector进行包裹
       val safeCollector = SafeCollector(collector, coroutineContext)
       try {
           // 调用collectSafely方法
           collectSafely(safeCollector)
       } finally {
           // 释放拦截的续体
           safeCollector.releaseIntercepted()
       }
   }

public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector<T> actual constructor(
   @JvmField internal actual val collector: FlowCollector<T>,
   @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

...
   // 保存上下文中元素数量,用于检查上下文是否变化
   @JvmField
   internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
   // 保存上一次的上下文
   private var lastEmissionContext: CoroutineContext? = null
   // 执行结束后的续体
   private var completion: Continuation<Unit>? = null

// 协程上下文
   override val context: CoroutineContext
       get() = completion?.context ?: EmptyCoroutineContext

// 挂起的核心方法
   override fun invokeSuspend(result: Result<Any?>): Any? {
       result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
       completion?.resumeWith(result as Result<Unit>)
       return COROUTINE_SUSPENDED
   }

// 释放拦截的续体
   public actual override fun releaseIntercepted() {
       super.releaseIntercepted()
   }

// 发射数据
   override suspend fun emit(value: T) {
       // 获取当前suspend方法续体
       return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
           try {
               // 调用重载的方法
               emit(uCont, value)
           } catch (e: Throwable) {
                // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
               lastEmissionContext = DownstreamExceptionElement(e)
               // 抛出异常
               throw e
           }
       }
   }

// 重载的emit方法
   private fun emit(uCont: Continuation<Unit>, value: T): Any? {
       // 从续体中获取上下文
       val currentContext = uCont.context
       // 保证当前协程的Job是active的
       currentContext.ensureActive()
       // 获取上次的上下文
       val previousContext = lastEmissionContext
       // 如果前后上下文发生变化
       if (previousContext !== currentContext) {
           // 检查上下文是否发生异常
           checkContext(currentContext, previousContext, value)
       }
       // 保存续体
       completion = uCont
       // 调用emitFun方法,传入collector,value,continuation
       return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
   }

// 检查上下文变化,防止并发
   private fun checkContext(
       currentContext: CoroutineContext,
       previousContext: CoroutineContext?,
       value: T
   ) {
       // 如果上次执行过程中发生了异常
       if (previousContext is DownstreamExceptionElement) {
           // 抛出异常
           exceptionTransparencyViolated(previousContext, value)
       }
       // 检查上下文是否发生变化,如果变化,则抛出异常
       checkContext(currentContext)
       lastEmissionContext = currentContext
   }

// 用于抛出异常
   private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
       error("""
           Flow exception transparency is violated:
               Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
               Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
               For a more detailed explanation, please refer to Flow documentation.
           """.trimIndent())
   }
}

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
   FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
  InlineMarker.mark(0);
  // 核心执行
  Object var10000 = p1.emit(p2, continuation);
  InlineMarker.mark(2);
  InlineMarker.mark(1);
  return var10000;
}

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
   collect(object : FlowCollector<T> {
       override suspend fun emit(value: T) = action(value)
   })

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) {
   // 获取当前suspend方法续体
   return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
       try {
           // 调用重载的方法
           emit(uCont, value)
       } catch (e: Throwable) {
           // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
           lastEmissionContext = DownstreamExceptionElement(e)
           // 抛出异常
           throw e
       }
   }
}

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

Kotlin协程之Flow基础原理示例解析

在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
   // 尝试获取异常
   result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
   // 如果没有异常,则恢复flow方法续体的执行
   completion?.resumeWith(result as Result<Unit>)
   // 返回挂起标识,这里挂起的是消费过程
   return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程&mdash;&mdash;flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

Kotlin协程之Flow基础原理示例解析

来源:https://juejin.cn/post/7137647612286468132

标签:Kotlin,Flow,协程
0
投稿

猜你喜欢

  • Java创建线程的两种方式

    2023-09-12 05:20:55
  • Java工程中使用Mybatis (工程结合Mybatis,数据结合Swing使用))

    2023-05-27 11:52:38
  • SpringBoot深入探究@Conditional条件装配的使用

    2021-08-18 00:06:53
  • java实现百度云OCR文字识别 高精度OCR识别身份证信息

    2023-10-24 13:50:37
  • Java利用Strategy模式实现堆排序

    2022-07-16 08:01:04
  • spring-boot-maven-plugin 配置有啥用

    2022-08-27 19:09:25
  • 基于Springboot一个注解搞定数据字典的实践方案

    2022-12-23 01:12:38
  • Android使用phonegap从相册里面获取照片(代码分享)

    2023-07-24 18:53:03
  • Java编程接口回调一般用法代码解析

    2023-11-11 06:55:11
  • unity 切换场景不销毁物体问题的解决

    2022-04-29 11:26:06
  • Java几种常用的断言风格你怎么选

    2021-10-30 23:30:32
  • Android实现自动变换大小的ViewPager

    2023-03-19 06:56:15
  • C#中抽象方法与虚拟方法的区别

    2021-10-30 07:30:15
  • MyBatis图文并茂讲解注解开发一对多查询

    2023-02-18 08:18:40
  • Java多线程实现第三方数据同步

    2023-06-30 10:57:57
  • SpringBoot Jpa分页查询配置方式解析

    2023-03-02 10:04:02
  • Android中使用Toast.cancel()方法优化toast内容显示的解决方法

    2021-12-14 05:17:03
  • Spring的IOC控制反转详解

    2023-08-24 02:50:50
  • 详解maven中profiles使用实现

    2022-11-13 23:14:24
  • Android进阶Handler应用线上卡顿监控详解

    2022-12-21 11:31:00
  • asp之家 软件编程 m.aspxhome.com