Kotlin协程Dispatchers原理示例详解

作者:潇风寒月 时间:2022-09-26 00:09:45 

前置知识

Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,最终会运行到虚拟机中。所以从本质上讲,Kotlin和Java是类似的,都是可以编译产生class的语言,但最终还是会受到虚拟机的限制,它们的代码最终会在虚拟机上的某个线程上被执行。

之前我们分析了launch的原理,但当时我们没有去分析协程创建出来后是如何与线程产生关联的,怎么被分发到具体的线程上执行的,本篇文章就带大家分析一下。

要想搞懂Dispatchers,我们先来看一下Dispatchers、CoroutineDispatcher、ContinuationInterceptor、CoroutineContext之间的关系

public actual object Dispatchers {
   @JvmStatic
   public actual val Default: CoroutineDispatcher = DefaultScheduler
   @JvmStatic
   public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
   @JvmStatic
   public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
   @JvmStatic
   public val IO: CoroutineDispatcher = DefaultIoScheduler
}
public abstract class CoroutineDispatcher :
   AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
}
public interface ContinuationInterceptor : CoroutineContext.Element {}
public interface Element : CoroutineContext {}

Dispatchers中存放的是协程调度器(它本身是一个单例),有我们平时常用的IO、Default、Main等。这些协程调度器都是CoroutineDispatcher的子类,这些协程调度器其实都是CoroutineContext。

demo

我们先来看一个关于launch的demo:

fun main() {
   val coroutineScope = CoroutineScope(Job())
   coroutineScope.launch {
       println("Thread : ${Thread.currentThread().name}")
   }
   Thread.sleep(5000L)
}

在生成CoroutineScope时,demo中没有传入相关的协程调度器,也就是Dispatchers。那这个launch会运行到哪个线程之上?

运行试一下:

Thread : DefaultDispatcher-worker-1

居然运行到了DefaultDispatcher-worker-1线程上,这看起来明显是Dispatchers.Default协程调度器里面的线程。我明明没传Dispatchers相关的context,居然会运行到子线程上。说明运行到default线程是launch默认的。

它是怎么与default线程产生关联的?打开源码一探究竟:

public fun CoroutineScope.launch(
   context: CoroutineContext = EmptyCoroutineContext,
   start: CoroutineStart = CoroutineStart.DEFAULT,
   block: suspend CoroutineScope.() -> Unit
): Job {
   //代码1
   val newContext = newCoroutineContext(context)
   //代码2
   val coroutine = if (start.isLazy)
       LazyStandaloneCoroutine(newContext, block) else
       StandaloneCoroutine(newContext, active = true)
   //代码3
   coroutine.start(start, coroutine, block)
   return coroutine
}
  • 将传入的CoroutineContext构造出新的context

  • 启动模式,判断是否为懒加载,如果是懒加载则构建懒加载协程对象,否则就是标准的

  • 启动协程

我们重点关注代码1,这是与CoroutineContext相关的。

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
   //从父协程那里继承过来的context+这次的context
   val combined = coroutineContext.foldCopiesForChildCoroutine() + context
   val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
   //combined可以简单的把它看成是一个map,它是CoroutineContext类型的
   //如果当前context不等于Dispatchers.Default,而且从map里面取ContinuationInterceptor(用于拦截之后分发线程的)值为空,说明没有传入协程应该在哪个线程上运行的相关参数
   return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
       debug + Dispatchers.Default else debug
}

调用launch的时候,我们没有传入context,默认参数是EmptyCoroutineContext。这里的combined,它其实是CoroutineContext类型的,可以简单的看成是map(其实不是,只是类似)。

通过combined[ContinuationInterceptor]可以将传入的线程调度相关的参数给取出来,这里如果取出来为空,是给该context添加了一个Dispatchers.Default,然后把新的context返回出去了。所以launch默认情况下,会走到default线程去执行。

补充一点:CoroutineContext能够通过+连接是因为它内部有个public operator fun plus函数。能够通过combined[ContinuationInterceptor]这种方式访问元素是因为有个public operator fun get函数。

public interface CoroutineContext {
    /**
    * Returns the element with the given [key] from this context or `null`.
    */
   public operator fun <E : Element> get(key: Key<E>): E?
    /**
    * Returns a context containing elements from this context and elements from  other [context].
    * The elements from this context with the same key as in the other one are dropped.
    */
   public operator fun plus(context: CoroutineContext): CoroutineContext {
       ......
   }
}

startCoroutineCancellable

上面我们分析了launch默认情况下,context中会增加Dispatchers.Default的这个协程调度器,到时launch的Lambda会在default线程上执行,其中具体流程是怎么样的,我们分析一下。

在之前的文章 Kotlin协程之launch原理 中我们分析过,launch默认情况下会最终执行到startCoroutineCancellable函数。

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
   //构建ContinuationImpl
   createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
   completion: Continuation<T>
): Continuation<Unit> {
   val probeCompletion = probeCoroutineCreated(completion)
   return if (this is BaseContinuationImpl)
       //走这里
       create(probeCompletion)
   else
       createCoroutineFromSuspendFunction(probeCompletion) {
           (this as Function1<Continuation<T>, Any?>).invoke(it)
       }
}

在Kotlin协程之launch原理 文章中,咱们分析过create(probeCompletion)这里创建出来的是launch的那个Lambda,编译器会产生一个匿名内部类,它继承自SuspendLambda,而SuspendLambda是继承自ContinuationImpl。

所以 createCoroutineUnintercepted(completion)一开始构建出来的是一个ContinuationImpl,接下来需要去看它的intercepted()函数。

intercepted()函数

internal abstract class ContinuationImpl(
   completion: Continuation<Any?>?,
   private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
   constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
   public override val context: CoroutineContext
       get() = _context!!
   @Transient
   private var intercepted: Continuation<Any?>? = null
   public fun intercepted(): Continuation<Any?> =
       intercepted
           ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
               .also { intercepted = it }
}

第一次走到intercepted()函数时,intercepted肯定是为null的,还没初始化。此时会通过context[ContinuationInterceptor]取出Dispatcher对象,然后调用该Dispatcher对象的interceptContinuation()函数。这个Dispatcher对象在demo这里其实就是Dispatchers.Default。

public actual object Dispatchers {
   @JvmStatic
   public actual val Default: CoroutineDispatcher = DefaultScheduler
}

可以看到,Dispatchers.Default是一个CoroutineDispatcher对象,interceptContinuation()函数就在CoroutineDispatcher中。

public abstract class CoroutineDispatcher :
   AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
   public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
       DispatchedContinuation(this, continuation)
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
   createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

这个方法非常简单,就是新建并且返回了一个DispatchedContinuation对象,将this和continuation给传入进去。这里的this是Dispatchers.Default。

所以,最终我们发现走完startCoroutineCancellable的前2步之后,也就是走完intercepted()之后,创建的是DispatchedContinuation对象,最后是调用的DispatchedContinuation的resumeCancellableWith函数。最后这步比较关键,这是真正将协程的具体执行逻辑放到线程上执行的部分。

internal class DispatchedContinuation<in T>(
   //这里传入的dispatcher在demo中是Dispatchers.Default
   @JvmField val dispatcher: CoroutineDispatcher,
   @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
   inline fun resumeCancellableWith(
       result: Result<T>,
       noinline onCancellation: ((cause: Throwable) -> Unit)?
   ) {
       val state = result.toState(onCancellation)
       //代码1
       if (dispatcher.isDispatchNeeded(context)) {
           _state = state
           resumeMode = MODE_CANCELLABLE
           //代码2
           dispatcher.dispatch(context, this)
       } else {
           //代码3
           executeUnconfined(state, MODE_CANCELLABLE) {
               if (!resumeCancelled(state)) {
                   resumeUndispatchedWith(result)
               }
           }
       }
   }
}
internal abstract class DispatchedTask<in T>(
   @JvmField public var resumeMode: Int
) : SchedulerTask() {
   ......
}
internal actual typealias SchedulerTask = Task
internal abstract class Task(
   @JvmField var submissionTime: Long,
   @JvmField var taskContext: TaskContext
) : Runnable {
   ......
}
public abstract class CoroutineDispatcher :
   AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
   public abstract fun dispatch(context: CoroutineContext, block: Runnable)
   public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
}

从DispatchedContinuation的继承结构来看,它既是一个Continuation(通过委托给传入的continuation参数),也是一个Runnable。

  • 首先看代码1:这个dispatcher在demo中其实是Dispatchers.Default ,然后调用它的isDispatchNeeded(),这个函数定义在CoroutineDispatcher中,默认就是返回true,只有Dispatchers.Unconfined返回false

  • 代码2:调用Dispatchers.Default的dispatch函数,将context和自己(DispatchedContinuation,也就是Runnable)传过去了

  • 代码3:对应Dispatchers.Unconfined的情况,它的isDispatchNeeded()返回false

现在我们要分析代码2之后的执行逻辑,也就是将context和Runnable传入到dispatch函数之后是怎么执行的。按道理,看到Runnable,那可能这个与线程执行相关,应该离我们想要的答案不远了。回到Dispatchers,我们发现Dispatchers.Default是DefaultScheduler类型的,那我们就去DefaultScheduler中或者其父类中去找dispatch函数。

DefaultScheduler中找dispatch函数

public actual object Dispatchers {
   @JvmStatic
   public actual val Default: CoroutineDispatcher = DefaultScheduler
}
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
   CORE_POOL_SIZE, MAX_POOL_SIZE,
   IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
   ......
}
internal open class SchedulerCoroutineDispatcher(
   private val corePoolSize: Int = CORE_POOL_SIZE,
   private val maxPoolSize: Int = MAX_POOL_SIZE,
   private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
   private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
   private var coroutineScheduler = createScheduler()
   private fun createScheduler() =
       CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}

最后发现dispatch函数在其父类SchedulerCoroutineDispatcher中,在这里构建了一个CoroutineScheduler,直接调用了CoroutineScheduler对象的dispatch,然后将Runnable(也就是上面的DispatchedContinuation对象)传入。

Runnable传入

internal class CoroutineScheduler(
   @JvmField val corePoolSize: Int,
   @JvmField val maxPoolSize: Int,
   @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
   @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
   override fun execute(command: Runnable) = dispatch(command)
   fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
       trackTask() // this is needed for virtual time support
       //代码1:构建Task,Task实现了Runnable接口
       val task = createTask(block, taskContext)
       //代码2:取当前线程转为Worker对象,Worker是一个继承自Thread的类
       val currentWorker = currentWorker()
       //代码3:尝试将Task提交到本地队列并根据结果执行相应的操作
       val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
       if (notAdded != null) {
           //代码4:notAdded不为null,则再将notAdded(Task)添加到全局队列中
           if (!addToGlobalQueue(notAdded)) {
               throw RejectedExecutionException("$schedulerName was terminated")
           }
       }
       val skipUnpark = tailDispatch && currentWorker != null
       // Checking 'task' instead of 'notAdded' is completely okay
       if (task.mode == TASK_NON_BLOCKING) {
           if (skipUnpark) return
           //代码5: 创建Worker并开始执行该线程
           signalCpuWork()
       } else {
           // Increment blocking tasks anyway
           signalBlockingWork(skipUnpark = skipUnpark)
       }
   }
   private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
   internal inner class Worker private constructor() : Thread() {
       .....
   }
}

观察发现,原来CoroutineScheduler类实现了java.util.concurrent.Executor接口,同时实现了它的execute方法,这个方法也会调用dispatch()。

  • 代码1:首先是通过Runnable构建了一个Task,这个Task其实也是实现了Runnable接口,只是把传入的Runnable包装了一下

  • 代码2:将当前线程取出来转换成Worker,当然第一次时,这个转换不会成功,这个Worker是继承自Thread的一个类

  • 代码3:将task提交到本地队列中,这个本地队列待会儿会在Worker这个线程执行时取出Task,并执行Task

  • 代码4:如果task提交到本地队列的过程中没有成功,那么会添加到全局队列中,待会儿也会被Worker取出来Task并执行

  • 代码5:创建Worker线程,并开始执行

开始执行Worker线程之后,我们需要看一下这个线程的run方法执行的是啥,也就是它的具体执行逻辑。

Worker线程执行逻辑

internal inner class Worker private constructor() : Thread() {
   override fun run() = runWorker()
   private fun runWorker() {
       var rescanned = false
       while (!isTerminated && state != WorkerState.TERMINATED) {
           //代码1
           val task = findTask(mayHaveLocalTasks)
           if (task != null) {
               rescanned = false
               minDelayUntilStealableTaskNs = 0L
               //代码2
               executeTask(task)
               continue
           } else {
               mayHaveLocalTasks = false
           }
           if (minDelayUntilStealableTaskNs != 0L) {
               if (!rescanned) {
                   rescanned = true
               } else {
                   rescanned = false
                   tryReleaseCpu(WorkerState.PARKING)
                   interrupted()
                   LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                   minDelayUntilStealableTaskNs = 0L
               }
               continue
           }
           tryPark()
       }
       tryReleaseCpu(WorkerState.TERMINATED)
   }
   fun findTask(scanLocalQueue: Boolean): Task? {
       if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
       // If we can't acquire a CPU permit -- attempt to find blocking task
       val task = if (scanLocalQueue) {
           localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
       } else {
           globalBlockingQueue.removeFirstOrNull()
       }
       return task ?: trySteal(blockingOnly = true)
   }
   private fun executeTask(task: Task) {
       val taskMode = task.mode
       idleReset(taskMode)
       beforeTask(taskMode)
       runSafely(task)
       afterTask(taskMode)
   }
   fun runSafely(task: Task) {
       try {
           task.run()
       } catch (e: Throwable) {
           val thread = Thread.currentThread()
           thread.uncaughtExceptionHandler.uncaughtException(thread, e)
       } finally {
           unTrackTask()
       }
   }
}

run方法直接调用的runWorker(),在里面是一个while循环,不断从队列中取Task来执行。

  • 代码1:从本地队列或者全局队列中取出Task

  • 代码2:执行这个task,最终其实就是调用这个Runnable的run方法。

也就是说,在Worker这个线程中,执行了这个Runnable的run方法。还记得这个Runnable是谁么?它就是上面我们看过的DispatchedContinuation,这里的run方法执行的就是协程任务,那这块具体的run方法的实现逻辑,我们应该到DispatchedContinuation中去找。

internal class DispatchedContinuation<in T>(
   @JvmField val dispatcher: CoroutineDispatcher,
   @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
   ......
}
internal abstract class DispatchedTask<in T>(
   @JvmField public var resumeMode: Int
) : SchedulerTask() {
   public final override fun run() {
       assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
       val taskContext = this.taskContext
       var fatalException: Throwable? = null
       try {
           val delegate = delegate as DispatchedContinuation<T>
           val continuation = delegate.continuation
           withContinuationContext(continuation, delegate.countOrElement) {
               val context = continuation.context
               val state = takeState() // NOTE: Must take state in any case, even if cancelled
               val exception = getExceptionalResult(state)
               /*
                * Check whether continuation was originally resumed with an exception.
                * If so, it dominates cancellation, otherwise the original exception
                * will be silently lost.
                */
               val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
               //非空,且未处于active状态
               if (job != null && !job.isActive) {
                   //开始之前,协程已经被取消,将具体的Exception传出去
                   val cause = job.getCancellationException()
                   cancelCompletedResult(state, cause)
                   continuation.resumeWithStackTrace(cause)
               } else {
                   //有异常,传递异常
                   if (exception != null) {
                       continuation.resumeWithException(exception)
                   } else {
                       //代码1
                       continuation.resume(getSuccessfulResult(state))
                   }
               }
           }
       } catch (e: Throwable) {
           // This instead of runCatching to have nicer stacktrace and debug experience
           fatalException = e
       } finally {
           val result = runCatching { taskContext.afterTask() }
           handleFatalException(fatalException, result.exceptionOrNull())
       }
   }
}

我们主要看一下代码1处,调用了resume开启协程。前面没有异常,才开始启动协程,这里才是真正的开始启动协程,开始执行launch传入的Lambda表达式。这个时候,协程的逻辑是在Worker这个线程上执行的了,切到某个线程上执行的逻辑已经完成了。

ps: rusume会走到BaseContinuationImpl的rusumeWith,然后走到launch传入的Lambda匿名内部类的invokeSuspend方法,开始执行状态机逻辑。前面的文章 Kotlin协程createCoroutine和startCoroutine原理 我们分析过这里,这里就只是简单提一下。

到这里,Dispatchers的执行流程就算完了,前后都串起来了。

小结

Dispatchers是协程框架中与线程交互的关键。底层会有不同的线程池,Dispatchers.Default、IO,协程任务来了的时候会封装成一个个的Runnable,丢到线程中执行,这些Runnable的run方法中执行的其实就是continuation.resume,也就是launch的Lambda生成的SuspendLambda匿名内部类,也就是开启协程状态机,开始协程的真正执行。

来源:https://juejin.cn/post/7127492385923137549

标签:Kotlin,协程,Dispatchers
0
投稿

猜你喜欢

  • Android简单实现天气预报App

    2022-11-05 05:50:32
  • java实现学籍管理系统

    2023-04-03 00:32:19
  • SpringBoot接口调用之后报404问题的解决方案

    2021-08-31 15:25:03
  • springboot整合quartz项目使用案例

    2023-02-13 19:57:12
  • Hibernate中的多表查询及抓取策略

    2022-02-22 18:58:28
  • C#获取所有进程的方法

    2022-06-13 16:02:24
  • Android九宫格程序设计代码

    2022-05-04 13:11:49
  • 通过实例解析spring环绕通知原理及用法

    2022-12-26 23:33:20
  • Android自定义wheelview随机选号效果

    2021-09-12 06:36:53
  • C#遍历操作系统下所有驱动器的方法

    2022-06-29 09:12:14
  • Android 如何获取手机总内存和可用内存等信息

    2023-07-27 13:11:42
  • Android 三级NestedScroll嵌套滚动实践

    2022-11-12 07:45:21
  • springmvc实现自定义类型转换器示例

    2021-09-29 23:46:53
  • C#移除字符串中的不可见Unicode字符 案例代码

    2023-04-28 19:06:06
  • 一篇文章带你了解Java SpringMVC返回null

    2023-11-24 15:18:03
  • java实现163邮箱发送邮件到qq邮箱成功案例

    2023-09-18 02:38:09
  • java使用软引用实现缓存机制示例

    2021-08-26 18:06:12
  • OpenGL绘制Bezier曲线的方法

    2023-11-01 03:59:30
  • 利用AOP实现SqlSugar自动事务

    2021-11-24 11:56:42
  • 详解springboot项目带Tomcat和不带Tomcat的两种打包方式

    2023-11-28 08:23:41
  • asp之家 软件编程 m.aspxhome.com