Kotlin协程launch启动流程原理详解

作者:无糖可乐爱好者 时间:2021-10-31 15:47:22 

1.launch启动流程

已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的呢,直接进入launch的源码开始看起。

fun main() {
   coroutineTest()
   Thread.sleep(2000L)
}
val block = suspend {
   println("Hello")
   delay(1000L)
   println("Kotlin")
}
private fun coroutineTest() {
   CoroutineScope(Job()).launch {
       withContext(Dispatchers.IO) {
           block.invoke()
       }
   }
}

反编译后的Java代码

public final class CoroutineDemoKt {
  @NotNull
  private static final Function1 block;
  public static final void main() {
     coroutineTest();
     Thread.sleep(2000L);
  }
  // $FF: synthetic method
  public static void main(String[] var0) {
     main();
  }
  @NotNull
  public static final Function1 getBlock() {
     return block;
  }
  private static final void coroutineTest() {
     BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
           Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           switch(this.label) {
           case 0:
              ResultKt.throwOnFailure($result);
              CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
              Function2 var10001 = (Function2)(new Function2((Continuation)null) {
                 int label;
                 @Nullable
                 public final Object invokeSuspend(@NotNull Object $result) {
                    Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                    switch(this.label) {
                    case 0:
                       ResultKt.throwOnFailure($result);
                       Function1 var10000 = CoroutineDemoKt.getBlock();
                       this.label = 1;
                       if (var10000.invoke(this) == var2) {
                          return var2;
                       }
                       break;
                    case 1:
                       ResultKt.throwOnFailure($result);
                       break;
                    default:
                       throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                    }
                    return Unit.INSTANCE;
                 }
                 @NotNull
                 public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                    Intrinsics.checkNotNullParameter(completion, "completion");
                    Function2 var3 = new <anonymous constructor>(completion);
                    return var3;
                 }
                 public final Object invoke(Object var1, Object var2) {
                    return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                 }
              });
              this.label = 1;
              if (BuildersKt.withContext(var10000, var10001, this) == var2) {
                 return var2;
              }
              break;
           case 1:
              ResultKt.throwOnFailure($result);
              break;
           default:
              throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
           }
           return Unit.INSTANCE;
        }
        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
           Intrinsics.checkNotNullParameter(completion, "completion");
           Function2 var3 = new <anonymous constructor>(completion);
           return var3;
        }
        public final Object invoke(Object var1, Object var2) {
           return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
        }
     }), 3, (Object)null);
  }
  static {
     Function1 var0 = (Function1)(new Function1((Continuation)null) {
        int label;
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
           Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           String var2;
           switch(this.label) {
           case 0:
              ResultKt.throwOnFailure($result);
              var2 = "Hello";
              System.out.println(var2);
              this.label = 1;
              if (DelayKt.delay(1000L, this) == var3) {
                 return var3;
              }
              break;
           case 1:
              ResultKt.throwOnFailure($result);
              break;
           default:
              throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
           }
           var2 = "Kotlin";
           System.out.println(var2);
           return Unit.INSTANCE;
        }
        @NotNull
        public final Continuation create(@NotNull Continuation completion) {
           Intrinsics.checkNotNullParameter(completion, "completion");
           Function1 var2 = new <anonymous constructor>(completion);
           return var2;
        }
        public final Object invoke(Object var1) {
           return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
        }
     });
     block = var0;
  }
}

先分析一下上面代码的流程:

  • 首先声明了一个Function1类型的block变量,这个变量就是demo中的block,然后会在static函数中会被赋值。

  • 接下来就是coroutineTest函数的调用。这个函数中的第一行代码就是CoroutineScope的传参和一些默认值

  • 然后通过89行的invoke进入到了外层状态机流转的过程

  • 95行的static表示的是内部的挂起函数就是demo中的block.invoke,它是以匿名内部类的方式实现,然后执行内部的状态机流转过程,最后给block赋值。

  • block被赋值后最终在Function1 var10000 = CoroutineDemoKt.getBlock();被调用

那么这个过程又是如何实现的,进入launch源码进行查看:

public fun CoroutineScope.launch(
   context: CoroutineContext = EmptyCoroutineContext,
   start: CoroutineStart = CoroutineStart.DEFAULT,
   block: suspend CoroutineScope.() -> Unit
): Job {
   val newContext = newCoroutineContext(context)
   val coroutine = if (start.isLazy)
       LazyStandaloneCoroutine(newContext, block) else
       StandaloneCoroutine(newContext, active = true)
   coroutine.start(start, coroutine, block)
   return coroutine
}

这里的block指的就是demo中的block代码段

再来看一下里面的几行代码的含义:

  • newCoroutineContext: 通过默认的或者传入的context创建一个新的Context;

  • coroutine: launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。

  • coroutine.start: 尝试启动协程

2.协程是如何被启动的

通过launch的源码可知协程的启动是通过coroutine.start启动的,那么协程的启动流程又是怎样的?

public abstract class AbstractCoroutine<in T>(
   parentContext: CoroutineContext,
   initParentJob: Boolean,
   active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
   ...
   /**
    * 用给定的代码块启动这个协程并启动策略。这个函数在这个协程上最多调用一次。
    */
   public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
       start(block, receiver, this)
   }
}

start函数中传入了三个参数,只需要关注第一个参数即可。

public enum class CoroutineStart {
   ...
   /**
    * 用这个协程的启动策略启动相应的块作为协程。
    */
   public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
   when (this) {
       DEFAULT -> block.startCoroutineCancellable(completion)
        * IC -> block.startCoroutine(completion)
       UNDISPATCHED -> block.startCoroutineUndispatched(completion)
       LAZY -> Unit // will start lazily
   }
}

启动策略的具体实现有三种方式,这里只需要分析startCoroutine,另外两个其实就是它的基础上增加了一些功能,其中前者代表启动协程以后可以在等待调度时取消,后者表示协程启动后不会被分发。

/**
* 创建没有接收方且结果类型为T的协程,这个函数每次调用时都会创建一个新的可挂起的实例。
*/
public fun <T> (suspend () -> T).startCoroutine(
   completion: Continuation<T>
) {
   createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

createCoroutineUnintercepted在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。

//IntrinsicsJvm.kt#createCoroutineUnintercepted
/**
* 创建没有接收方且结果类型为T的非拦截协程。这个函数每次调用时都会创建一个新的可挂起的实例。
*/
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
   completion: Continuation<T>
): Continuation<Unit> {
   val probeCompletion = probeCoroutineCreated(completion)
   return if (this is BaseContinuationImpl)
       create(probeCompletion)
   else
       createCoroutineFromSuspendFunction(probeCompletion) {
           (this as Function1<Continuation<T>, Any?>).invoke(it)
       }
}

actual代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。

createCoroutineUnintercepted是一个扩展函数,接收者类型是一个无参数,返回值为 T 的挂起函数或者 Lambda。

第9行代码中的this代表的是(suspend () -> T)也就是invoke函数中的block变量,这个block变量就是demo中的block代码段。

第9行的BaseContinuationImpl是一个抽象类它实现了Continuation

关于if (this is BaseContinuationImpl)的结果暂且不分析,先分析两种情况下的create函数:

  • create(probeCompletion):

//ContinuationImpl.kt#create
public open fun create(completion: Continuation<*>): Continuation<Unit> {
   throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
   throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}

这个create函数抛出一个异常,意思就是这个create()没有被重写,而这个create()的重写就是在反编译后的Java代码中的create函数

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
   Intrinsics.checkNotNullParameter(completion, "completion");
   Function2 var3 = new <anonymous constructor>(completion);
   return var3;
}
  • createCoroutineFromSuspendFunction(probeCompletion):

//IntrinsicsJvm.kt#createCoroutineFromSuspendFunction
/**
* 当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。
*
* 它发生在两种情况下:
* 1.lambda表达式中调用了其他的挂起方法
* 2.挂起方法是通过Java实现的
*
* 必须将它封装到一个扩展[BaseContinuationImpl]的实例中,因为这是所有协程机制的期望。
*/
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// context为空创建一个受限协程
return if (context === EmptyCoroutineContext)
//受限协程:只能调用协程作用域中提供的挂起方式挂起,其他挂起方法不能调用
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
block(this) // 运行块,可以返回或挂起
}
1 -> {
label = 2
result.getOrThrow() // 这是block挂起的结果
}
else -> error("This coroutine had already completed")
}
}
else
//创建一个正常的协程
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
block(this) // 运行块,可以返回或挂起
}
1 -> {
label = 2
result.getOrThrow() // 这是block挂起的结果
}
else -> error("This coroutine had already completed")
}
}
}

createCoroutineFromSuspendFunction就是当一个被suspend修饰的Lambda表达式没有继承BaseContinuationImpl是才会被调用,然后根据上下文是否为空创建不同类型的协程。

两种情况都已经分析完了,那么现在if (this is BaseContinuationImpl)会执行哪一个呢,首先这里的this所指的就是demo中的block代码段,Kotlin编译器编译后会自动生成一个类就是上面的static,它会继承SuspendLambda类,而这个SuspendLambda类继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,因此可以得到判断结果为true,

createCoroutineUnintercepted的过程就是协程创建的过程。

然后就是intercepted函数,这个函数的具体实现也在IntrinsicsJvm.kt中,那么intercepted又做了什么呢

public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
//具体实现
//IntrinsicsJvm.kt#intercepted
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
   (this as? ContinuationImpl)?.intercepted() ?: this

首先有个强转,通过上面的分析这个强转是一定会成功的,到这里intercepted就进入到了ContinuationImpl中了

internal abstract class ContinuationImpl(
   completion: Continuation<Any?>?,
   private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...
   @Transient
   private var intercepted: Continuation<Any?>? = null
//如果没有缓存,则从上下文获取 * ,调用interceptContinuation进行拦截
//将获取到的内容保存到全局变量
   public fun intercepted(): Continuation<Any?> =
       intercepted
           ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
               .also { intercepted = it }
}

这里的ContinuationInterceptor指的就是Demo中传输的Dispatcher.IO,默认值时Dispatcher.Default

再回到startContinue中还剩最后一个resume

/**
* 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
*/
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public interface Continuation<in T> {
/**
    * 与此延续相对应的协程的上下文。
    */
public val context: CoroutineContext
/**
    * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
    */
public fun resumeWith(result: Result<T>)
}

这里的resume(Unit)作用就相当与启动了一个协程。

上面的启动流程中为了方便分析的是CoroutineStart. * IC,而默认的是CoroutineStart.DEFAULT,下面分析一下DEFAULT的流程

//Cancellable.kt#startCoroutineCancellable
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
   createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

startCoroutineCancellable对于协程的创建和拦截与 * IC是一样的,区别就在于resumeCancellableWith

//DispatchedContinuation#resumeCancellableWith
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
// 我们内联它来保存堆栈上的一个条目,在它显示的情况下(无限制调度程序)
// 它只在Continuation<T>.resumeCancellableWith中使用
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//是否需要分发
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//将可运行块的执行分派给给定上下文中的另一个线程
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
//协程未被取消
if (!resumeCancelled(state)) {
// 恢复执行
resumeUndispatchedWith(result)
}
}
}
}
//恢复执行前判断协程是否已经取消执行
inline fun resumeCancelled(state: Any?): Boolean {
//获取当前协程任务
val job = context[Job]
//如果不为空且不活跃
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
//抛出异常
resumeWithException(cause)
return true
}
return false
}
//我们需要内联它来在堆栈中保存一个条目
inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}

来源:https://juejin.cn/post/7174204360699117605

标签:Kotlin,协程,launch,启动流程
0
投稿

猜你喜欢

  • C#中定时任务被阻塞问题的解决方法

    2023-10-27 00:56:02
  • C#实现日期格式转换的公共方法类实例

    2023-05-16 10:06:54
  • java如何使用Lombok更优雅地编码

    2022-07-24 23:24:50
  • Java聊天室之使用Socket实现通信功能

    2022-03-08 09:46:27
  • Android利用FlexboxLayout轻松实现流动布局

    2021-06-24 02:41:53
  • httpwebreqeust读取httponly的cookie方法

    2022-04-19 19:47:05
  • 利用C#代码实现图片旋转360度

    2022-04-11 05:24:38
  • 完美解决Android Studio集成crashlytics后无法编译的问题

    2023-06-23 16:49:07
  • Java为什么占用四个字节你知道吗

    2021-06-16 18:05:22
  • MyBatis动态SQL中的trim标签的使用方法

    2022-08-30 21:38:13
  • idea mybatis配置log4j打印sql语句的示例

    2023-11-25 10:32:39
  • springboot热部署知识点总结

    2021-08-23 12:05:43
  • jQuery.event.trigger()的简单解释

    2023-05-24 03:41:37
  • java,android,MD5加密算法的实现代码(16位,32位)

    2022-07-12 20:40:10
  • spring mvc中的@PathVariable获得请求url中的动态参数

    2023-08-22 22:08:40
  • 从零实现一个简单的Spring Bean容器的代码案例

    2022-07-24 11:42:16
  • 使用Jenkins来构建SVN+Maven项目的实现

    2023-07-30 12:45:22
  • Mybatis插件之自动生成不使用默认的驼峰式操作

    2023-11-19 01:20:03
  • SpringBoot一个非常蛋疼的无法启动的问题解决

    2023-12-12 15:24:36
  • 全面分析Java文件上传

    2021-12-09 13:22:52
  • asp之家 软件编程 m.aspxhome.com