Kotlin协程的线程调度示例详解

2022-12-13 08:58:05
目录
引言一、协程的分发器作用1.1 测试代码1.2 CoroutineScope.launch1.2.1 newCoroutineContext1.3 startCoroutineCancellable1.3.1 intercepted()1.3.2 CoroutineDispatcher1.3.3 小结1.4 DispatchedContinuation1.5 DefaultScheduler1.5.1 SchedulerCoroutineDispatcher1.5.2 CoroutineScheduler1.6 DispatchedTask.run()1.7 总结二、协程中的线程切换2.1 反编译代码2.1.1 MainActivityonCreateonCreateonCreate12.1.2 AnonymousClass12.2 withContext2.2.1 startCoroutineCancellable2.3 resumeWith2.4 DispatchedCoroutine2.4.1 DispatchedCoroutine 的继承关系2.5 协程线程的恢复2.5.1 AbstractCoroutine.resumeWith()2.5.2 afterResume2.6 总结2.7 Dispatchers.Main2.7.1 MainDispatcherLoader2.7.2 AndroidDispatcherFactory2.7.3 HandlerContext

引言

在第一篇文章中我们分析了协程启动创建过程启动过程,在本文中,我们将着重剖析协程中协程调度的逻辑流程。主要是分析解答如下2个问题:

    涉及到协程方法器是如何将协程代码调度到特定的线程执行?子协程执行完又是如何切换0回父协程的线程环境?

    一、协程的分发器作用

    1.1>
    GlobalScope.launch {
        //协程体1
        Log.d(TAG, "before suspend job.")
        withContext(Dispatchers.Main) {
            //协程体2
            Log.d(TAG, "print in Main thread.")
        }
        Log.d(TAG, "after suspend job.")
    }
    
      此次的协程测试用例中,我们默认的launch一个协程,我们简单的将launch需要执行的这外层逻辑为协程体1。在协程体1中,我们使用withContext将协程切换到主线程执行,打印日志。我们将这里面执行的协程逻辑为协程体2。协程体2执行完成后,切回协程体1中执行并打印Log。注意,根据我们之前《协程的创建与启动》文章中分析的,Kotlin编译器针对协程体1和协程体2分别生成一个继承与SuspenLamabda的类型,比如:class MainActivity#onCreate$1 : SuspenLambda{...}。我们在讲协程体时,也同时代指这个类实例。

      继续跟踪launch()函数执行逻辑,这次跟踪过程不同与《协程的创建与启动》篇章,我们会将侧重点放在启动过程中协程调度器是如何起作用的?接下来见1.2

      1.2>
      public fun CoroutineScope.launch(
          context: CoroutineContext = EmptyCoroutineContext,
          start: CoroutineStart = CoroutineStart.DEFAULT,
          block: suspend CoroutineScope.() -> Unit
      ): Job {
          //1. 见1.2.1
          val newContext = newCoroutineContext(context)
          val coroutine = if (start.isLazy)
              LazyStandaloneCoroutine(newContext, block) else
              StandaloneCoroutine(newContext, active = true)
          //2. 详见1.3
          coroutine.start(start, coroutine, block)
          return coroutine
      }
      
        这里会新建一个CoroutineContext,详见1.2.1根据之前的分析,这个里最终会调用到startCoroutineCancellable()方法,详见1.3流程。

        1.2.1>
        public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
            val combined = foldCopies(coroutineContext, context, true)
            val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
            return 
            if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
                debug + Dispatchers.Default
            else 
            	debug
        }
        

        coroutineContextcoroutineContextCoroutineScope的成员变量,当此时为GlobalScope.coroutineContext==EmptyCoroutineContext

        context:由于调用launch时没有指定Context,所以传到此处也是EmptyCoroutineContextfoldCopies()函数将2个context相加并拷贝,最终combied==EmptyCoroutineContext

        而在return这最后判断返回的是debug+Dispatchers.Defatult,所以此时默认的分发器为Dispatchers.Defatult

        这里涉及到的协程Context运算不做深入剖析,简单可以认为协程重写了“+”运算,使得Context之间可以使用“+”来叠加,没有的Element类型会被添加到Element集合,集合中已有的Element类型会被覆盖。

        1.3>
        internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
            receiver: R, completion: Continuation<T>,
            onCancellation: ((cause: Throwable) -> Unit)? = null
        ) =
            runSafely(completion) {
            	//1. 创建SuspendLambda协程体
                createCoroutineUnintercepted(receiver, completion)
                    //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
                    .intercepted()
                    //3. 调用方法器Continuation的resume方法,详见1.4
                    .resumeCancellableWith(Result.success(Unit), onCancellation)
            }
        
          这里的构建协程体在《协程的创建与启动》一节中已经剖析,不再赘述。进行拦截,注意:这里其实会根据方法器再构建出一个DispatchedContinuation对象,它也是一个续体类型,这是对协程体的一次包装。详见1.3.1小节。调用拦截器续体的resumeCancellableWith()开始状态机流转,执行分发流程详见1.4小节。

          1.3.1>
           public fun intercepted(): Continuation<Any?> =
                  intercepted?: (
                          //1. 取出拦截器
                          context[ContinuationInterceptor]?
                              //2.构建拦截器续体
                              .interceptContinuation(this)?: this)
                          .also { intercepted = it }
          
            取出当前上下文中的拦截器类型,根据之前1.2.1小节的分析,这里取出来的是Dispatchers.DefatultinterceptContinuation(this)为构建拦截器续体,注意这里传入的this是协程体1。 详见1.3.2。

            1.3.2>
            //Base class to be extended by all coroutine dispatcher implementations.
            public abstract class CoroutineDispatcher :
                AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
            public final override fun <T> interceptContinuation(continuation: Continuation<T>):
                    //详见1.4
                    Continuation<T> = DispatchedContinuation(this, continuation)
            }
            

            直接新建了一个DispatchedContinuation对象实例这里需要注意传入的构建参数:

              this:当前Dispatcher,也就是Dispatchers.Defatultcontinuation:协程体1。

              1.3.3>

              自此Continuation.intercepted()方法就分析结束,最终的结果是:用上下文中的Dispatcher和当前Contination对象也就是协程体1,共同作为构建参数,新建了一个DispatchedContinuation对象。

              接下来接着1.3中的第三点,调用DispatchedContinuation.resumeCancellableWith()方法开始分析。

              1.4>
              internal class DispatchedContinuation<in T>(
                  //1. 分发器
                  @JvmField val dispatcher: CoroutineDispatcher,
              	//2. 注意这里将Continuation的实现委托给了continuation成员变量。
                  @JvmField val continuation: Continuation<T>
              ) : DispatchedTask<T>(MODE_UNINITIALIZED)
              , CoroutineStackFrame,
              Continuation<T> by continuation {
                  	//3. 复写属性delegate为自己
              	    override val delegate: Continuation<T>
                      get() = this
                  ...
                  // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
                  // It is used only in Continuation<T>.resumeCancellableWith
                  @Suppress("NOTHING_TO_INLINE")
                  inline fun resumeCancellableWith(
                      result: Result<T>,
                      noinline onCancellation: ((cause: Throwable) -> Unit)?
                  ) {
                      val state = result.toState(onCancellation)
                      //默认为true
                      if (dispatcher.isDispatchNeeded(context)) {
                          _state = state
                          resumeMode = MODE_CANCELLABLE
                          //4. 详细见
                          dispatcher.dispatch(context, this)
                      } else {
                          executeUnconfined(state, MODE_CANCELLABLE) {
                              if (!resumeCancelled(state)) {
                                  resumeUndispatchedWith(result)
                              }
                          }
                      }
                  }
              }
              

              这里的dispatcher==Dispatchers.Defatult,所以接下来需要解析Dispatchers.Defatult到底是什么东西。详见1.5

                成员变量dispatcher==Dispatchers.Default成员变量continucation==协程体1(SuspenLambda类型实例)。同时DispatchedContinuation继承于Continuation接口,它将Continuation接口的实现委托给了成员变量continuationdeleagte为复写了DispatchedTask.delegate属性,将其返回自己。调用分发器也就是Dispatchers.Defatultdispatch()方法,注意这里传入的参数:

                context:来自Continuation接口的属性,由于委托给了成员变量continuation,所以此context==continuation.context

                this:分发器本身Dispatchers.Defatult

                自此这个方法的分析结束:调用分发器的进行分发,接下来分析就开始分析协程方法器CoroutineDispatcher

                1.5>
                //Dispathcer.kt
                @JvmStatic
                public actual val Default: CoroutineDispatcher = DefaultScheduler
                //Dispathcer.kt
                // Instance of Dispatchers.Default
                internal object DefaultScheduler : SchedulerCoroutineDispatcher(
                    CORE_POOL_SIZE, MAX_POOL_SIZE,
                    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
                ) {
                    ...
                }
                

                实际上是继承 SchedulerCoroutineDispatcher类型。详见1.5.1

                1.5.1>
                internal open class SchedulerCoroutineDispatcher(
                    private val corePoolSize: Int = CORE_POOL_SIZE,
                    private val maxPoolSize: Int = MAX_POOL_SIZE,
                    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
                    private val schedulerName: String = "CoroutineScheduler",
                ) : ExecutorCoroutineDispatcher() {
                    override val executor: Executor
                        get() = coroutineScheduler
                    // This is variable for test purposes, so that we can reinitialize from clean state
                    private var coroutineScheduler = createScheduler()
                    private fun createScheduler() =
                        //1. 详见1.5.2
                        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
                    //2. 详见1.5.2
                    override fun dispatch(context: CoroutineContext, block: Runnable): Unit 
                    = coroutineScheduler.dispatch(block)
                    ...
                }
                //Executors.kt
                //2. 实际上是继承ExecutorCoroutineDispatcher
                public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
                    ...
                }
                
                  可以看到实际上调用了CoroutineScheduler.dispatch方法。此时发现,第二个参数是Runnable类型的,而在1.4小节中,我们知道传入的是this也就是DispatchedContinuation,所以DispatchedContinuation继承的父类中,必定有继承了Runnable接口,而他的run方法的实现也在父类中,这块我们暂时按下不表,接着看继续跟踪coroutineScheduler.dispatch(block)

                  1.5.2>
                  internal class CoroutineScheduler(
                      @JvmField val corePoolSize: Int,
                      @JvmField val maxPoolSize: Int,
                      @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
                      @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
                  ) : Executor, Closeable {
                  	... 
                      override fun execute(command: Runnable) = dispatch(command)
                      fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
                          trackTask() // this is needed for virtual time support
                          val task = createTask(block, taskContext)
                          // try to submit the task to the local queue and act depending on the result
                          val currentWorker = currentWorker()
                          val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
                          if (notAdded != null) {
                              if (!addToGlobalQueue(notAdded)) {
                                  // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                                  throw RejectedExecutionException("$schedulerName was terminated")
                              }
                          }
                          val skipUnpark = tailDispatch && currentWorker != null
                          // Checking 'task' instead of 'notAdded' is completely okay
                          if (task.mode == TASK_NON_BLOCKING) {
                              if (skipUnpark) return
                              signalCpuWork()
                          } else {
                              // Increment blocking tasks anyway
                              signalBlockingWork(skipUnpark = skipUnpark)
                          }
                      }
                  }
                  
                    该类继承了Executor类,而且它的构建参数可看到是线程池的参数,所以可以知道这个其实是Kotlin协程实现的一个线程池,具体就不跟进去了。execute()过程也是dispatch过程:将任务投递到任务队列,然后通知线程去取任务执行,自此完成了线程切换动作。而在新线程里执行的Runnable为1.4中的调用代码:dispatcher.dispatch(context, this)中的this,也就是DispatchedContinuationDispatchedContinuation.kt并没有实现run方法,那么一定是他继承的父类实现了Runnable接口并实现,所以需要接着看它继承的父类:DispatchedTask类。

                    1.6>
                    internal abstract class DispatchedTask<in T>(
                        @JvmField public var resumeMode: Int
                    ) : SchedulerTask() {
                    	...
                        internal abstract val delegate: Continuation<T>
                        @Suppress("UNCHECKED_CAST")
                        internal open fun <T> getSuccessfulResult(state: Any?): T =
                            state as T
                        internal open fun getExceptionalResult(state: Any?): Throwable? =
                            (state as? CompletedExceptionally)?.cause
                        public final override fun run() {
                            assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
                            val taskContext = this.taskContext
                            var fatalException: Throwable? = null
                            try {
                                val delegate = delegate as DispatchedContinuation<T>
                                //1. 取出代理商的续体
                                val continuation = delegate.continuation
                                withContinuationContext(continuation, delegate.countOrElement) {
                                    val context = continuation.context
                                    val state = takeState() // NOTE: Must take state in any case, even if cancelled
                                    val exception = getExceptionalResult(state)
                                    val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                                    if (job != null && !job.isActive) {
                                        val cause = job.getCancellationException()
                                        cancelCompletedResult(state, cause)
                                        continuation.resumeWithStackTrace(cause)
                                    } else {
                                        if (exception != null) {
                                            continuation.resumeWithException(exception)
                                        } else {
                                            //1. 被包装的续体的resume方法,真正的开始出发其协程状态机代码。
                                            continuation.resume(getSuccessfulResult(state))
                                        }
                                    }
                                }
                            } catch (e: Throwable) {
                                // This instead of runCatching to have nicer stacktrace and debug experience
                                fatalException = e
                            } finally {
                                val result = runCatching { taskContext.afterTask() }
                                handleFatalException(fatalException, result.exceptionOrNull())
                            }
                        }
                    }
                    
                      delegate转为DispatchedContinuation,应该注意1.4 小节中DispatchedContinuation继承DispatchTask时,便对此delegate进行了复写:

                      override val delegate: Continuation

                      get() = this

                      而此delegate.continucation便是当初newDispatchedContinuation(this)时传入的this,此this就是Kotlin编译器一开始为协程体生成的SuspendLambda类型对象。具体可以回看1.3小节。

                        调用了continuation.resume()方法触发了协程的状态机进而开始执行协程业务逻辑代码,结合之前1.5.2的分析可以知道,这个方法的调用已经是被dispatch到特定线程,完成线程切换后执行的。所以协程状态机的代码也是跑在新线程上的。

                        1.7>

                        至此,协程的线程调度分析结束,关键有如下几个要点:

                          创建SuspendLambda时,他的协程上下文对象来自于comletion.context,默认就是Dispatcher.DefaultSuspendLambda启动时调用了intercept()进行一层包装,得到DispatchedContinuation,后续协程启动是启动的DispatchedContinuation协程。DispatchedContinuation继承于Runnable接口,协程启动时将自己投递到分发器dispatcher执行run方法,从而达到了线程切换效果。DispatchedContinuationrun方法中,调用SuspendLambda.resume()启动状态机。在新线程执行协程状态机代码。

                          这一小节中,介绍了如何将协程调度到目的线程执行,接下来分析如何做到随意切换线程后,然后再恢复到原来线程的。

                          二、协程中的线程切换

                          在第一小节中,我们搞清楚了协程启动时,协程调度器是如何在其中起作用的。这一小节旨在剖析在协程用分发器切换线程执行新的挂起函数后,是如何切换会原来线程继续执行剩下的逻辑的。

                          为此,我们需要将1.1的测试代码反编译出来实际代码进而分析。

                          2.1>

                          2.1.1>
                          final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
                              ...
                              @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
                              public final Object invokeSuspend(Object $result) {
                                  Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                  switch (this.label) {
                                      case 0:
                                          ResultKt.throwOnFailure($result);
                                          Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04());
                                          this.label = 1;
                                          //1. 新建编译器自动生成的继承于SuspendLambda的类型。
                                          AnonymousClass1 anonymousClass1 = new AnonymousClass1(null);
                                          //2. 调用withContext
                                      	Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this);
                                          if (res != coroutine_suspended) {
                                              break;
                                          } else {
                                              //挂起
                                              return coroutine_suspended;
                                          }
                                      case 1:
                                          ResultKt.throwOnFailure($result);
                                          break;
                                      default:
                                          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                  }
                                  Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328());
                                  return Unit.INSTANCE;
                              }
                          }
                          

                          根据之前的文章分析,这里suspend lambda 的类型都自动生成继承于SuspendLambda的类型。详见2.1.2。

                          anonymousClass1传入withContext,而且注意这里传入了this==MainActivity$onCreate$1,详见2.2。

                          2.1.2>
                          /* compiled from: MainActivity.kt */
                          public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> {
                              int label
                              ...
                              @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
                              public final Object invokeSuspend(Object obj) {
                                  IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                  switch (this.label) {
                                      case 0:
                                          ResultKt.throwOnFailure(obj);
                                          return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f()));
                                      default:
                                          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                  }
                              }
                          }
                          

                          2.2>
                          public suspend fun <T> withContext(
                              context: CoroutineContext,
                              block: suspend CoroutineScope.() -> T
                          ): T {
                              contract {
                                  callsInPlace(block, InvocationKind.EXACTLY_ONCE)
                              }
                              //1. 获取当前协程, 注意这里的uCont就是当前续体,也就是MainActivity$onCreate$1
                              return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
                                  //2. 计算获的新的协程上下文
                                  val oldContext = uCont.context
                                  val newContext = oldContext + context
                                  //3. 快速判断:新上下文和旧上下文一致的情况快速处理。
                                  // always check for cancellation of new context
                                  newContext.ensureActive()
                                  // FAST PATH #1 -- new context is the same as the old one
                                  if (newContext === oldContext) {
                                      val coroutine = ScopeCoroutine(newContext, uCont)
                                      return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
                                  }
                                  // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
                                  // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
                                  if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
                                      val coroutine = UndispatchedCoroutine(newContext, uCont)
                                      // There are changes in the context, so this thread needs to be updated
                                      withCoroutineContext(newContext, null) {
                                          return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
                                      }
                                  }
                                  // SLOW PATH -- use new dispatcher
                                  //4. 新建一个DispatchedCoroutine
                                  val coroutine = DispatchedCoroutine(newContext, uCont)
                                  //5. 启动协程
                                  block.startCoroutineCancellable(coroutine, coroutine)
                                  coroutine.getResult()
                              }
                          }
                          
                            suspendCoroutineUninterceptedOrReturn这个函数直接步进是看不到实现的,它的实现是由Kotlin编译器生成的,它的作用是用来获取当前续体的,并且通过uCont返回,这里就是MainActivity$onCreate$1将旧协程上下文和新的上下文一起。计算得到最终的上下文。这里的context==Dispatchers.getIO()快速判断,不用看。新建一个DispatchedCoroutine,注意这里传入了新的协程上下文和当前续体对象。调用startCoroutineCancellable()启动协程。这里的同1.3.2小节分析一样,详见 2.2.1

                            2.2.1>
                            internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
                                receiver: R, completion: Continuation<T>,
                                onCancellation: ((cause: Throwable) -> Unit)? = null
                            ) =
                                runSafely(completion) {
                                	//1. 创建SuspendLambda协程体
                                    createCoroutineUnintercepted(receiver, completion)
                                        //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
                                        .intercepted()
                                        //3. 调用方法器Continuation的resume方法,详见1.4
                                        .resumeCancellableWith(Result.success(Unit), onCancellation)
                                }
                            

                            此方法在之前1.3小节已经分析过,针对此此次调用,其中的改变是协程上下文中的分发器已经被设置为Dispatchers.Main

                              创建了SuspendLambda对象,此对象的CoroutineContextcompletion.context。而其中的ContinuationInterceptor类型Element就是我们之前传入的Dispatchers.Main创建一个DispatchedContinuation将协程SuspendLambda的状态机逻辑通过Dispatcher.Main调度到主线程执行,调度过程参考第一下节。分发逻辑详见2.7小节。SuspendLambda的状态机invokeSuspend()逻辑执行完成后,会返回到BaseContinuationImpl.resumeWith(),我们需要接此方法分析,来得到协程在切换到主线程执行后,又是怎么切回协程体1的执行线程的,详见2.3。

                              2.3>
                              public final override fun resumeWith(result: Result<Any?>) {
                                      // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
                                      var current = this
                                      var param = result
                                      while (true) {
                                          // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
                                          // can precisely track what part of suspended callstack was already resumed
                                          probeCoroutineResumed(current)
                                          with(current) {
                                              val completion = completion!! // fail fast when trying to resume continuation without completion
                                              val outcome: Result<Any?> =
                                                  try {
                                                      val outcome = invokeSuspend(param)
                                                      if (outcome === COROUTINE_SUSPENDED) return
                                                      Result.success(outcome)
                                                  } catch (exception: Throwable) {
                                                      Result.failure(exception)
                                                  }
                                              releaseIntercepted() // this state machine instance is terminating
                                              if (completion is BaseContinuationImpl) {
                                                  // unrolling recursion via loop
                                                  current = completion
                                                  param = outcome
                                              } else {
                                                  //1. 进入此判断
                                                  // top-level completion reached -- invoke and return
                                                  completion.resumeWith(outcome)
                                                  return
                                              }
                                          }
                                      }
                                  }
                              

                              当状态机执行完后, 后进入到completion的类型判断,由2.2和2.2.1可以知道,当初传入的completion是DispatchedCoroutine类型,所以加入到else分支,调用了DispatchedCoroutine.resumeWith(),接下来分析此方法。

                              在此之前,我们需要看下DispatchedCoroutine的继承关系,详见2.4.1。如果想直接跟踪流程,可以直接看2.4.2。

                              2.4>

                              2.4.1>
                              internal class DispatchedCoroutine<in T>(
                                  context: CoroutineContext,
                                  uCont: Continuation<T>
                              ) : ScopeCoroutine<T>(context, uCont) {
                              }
                              

                              继承于ScopeCoroutine

                              internal open class ScopeCoroutine<in T>(
                                  context: CoroutineContext,
                                  @JvmField val uCont: Continuation<T> // unintercepted continuation
                              ) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
                              }
                              

                              继承于AbstractCoroutine

                              public abstract class AbstractCoroutine<in T>(
                                  parentContext: CoroutineContext,
                                  initParentJob: Boolean,
                                  active: Boolean
                              ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
                              }
                              

                              2.5>

                              2.5.1>
                                  public final override fun resumeWith(result: Result<T>) {
                                      val state = makeCompletingOnce(result.toState())
                                      if (state === COMPLETING_WAITING_CHILDREN) return
                                      afterResume(state)
                                  }
                              

                              调用了afterResume方法,此方法在DispatchedCoroutine类型有具体实现。见2.5.2

                              2.5.2>
                              //DispatchedCoroutine
                              override fun afterResume(state: Any?) {
                                      if (tryResume()) return // completed before getResult invocation -- bail out
                                      // Resume in a cancellable way because we have to switch back to the original dispatcher
                                      uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
                              }
                              
                                取出当前续体uCont,这个续体根据之前的分析:2.2小节,可以知道它等于MainActivity$onCreate$1intercepted():取出其分发拦截器resumeCancellableWith:使用方法拦截器协程体,将uCont续体的状态机逻辑调度到相对应的线程环境执行,这里就是之前的Dispatcher.Default。注意其注释:“将其切换到原先的分发器”。2⃣而这一过程其实和1.3小节的过程一致。恢复到Dispatcher.Default继续执行状态机时,由于label已经被更新,所以会往下继续执行,打印最后一句log。

                                2.6>

                                withContext(Dispatcher.Main)启动的协程时,取得当前协程续体uCount也就是MainActivity$onCreate$1,会计算出新的协程context,然后用它们创建一个DispatchedCoroutine

                                AnonymousClass1协程启动时,用DispatchedCoroutine作为completion参数,然后启动,此时会调度主线程执行协程。

                                当协程执行完成后,AnonymousClass1.resumeWith()方法会调用completion.resumeWith()

                                DispatchedCoroutine.resumeWith()方法会调用uCount.intercepted().resumeCancellableWith(),使得父协程进行调度并接着执行状态机逻辑。

                                2.7>
                                    @JvmStatic
                                    public actual val Main: MainCoroutineDispatcher get() 
                                = MainDispatcherLoader.dispatcher
                                

                                直接详见2.7.1

                                2.7.1>
                                internal object MainDispatcherLoader {
                                    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
                                    @JvmField
                                    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
                                    private fun loadMainDispatcher(): MainCoroutineDispatcher {
                                        return try {
                                            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                                                FastServiceLoader.loadMainDispatcherFactory()
                                            } else {
                                                // We are explicitly using the
                                                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                                                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                                                // 1.获得MainDispatcherFactory的实现类
                                                ServiceLoader.load(
                                                        MainDispatcherFactory::class.java,
                                                        MainDispatcherFactory::class.java.classLoader
                                                ).iterator().asSequence().toList()
                                            }
                                            @Suppress("ConstantConditionIf")
                                            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                                                ?: createMissingDispatcher()
                                        } catch (e: Throwable) {
                                            // Service loader can throw an exception as well
                                            createMissingDispatcher(e)
                                        }
                                    }
                                }
                                
                                  通过ServiceLoad机制获取MainDispatcherFactory的实现类,而在源码里面,其实现类为AndroidDispatcherFactory调用tryCreateDispatcher()创建分发器,详见2.7.2。

                                  2.7.2>
                                  internal class AndroidDispatcherFactory : MainDispatcherFactory {
                                      override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
                                          HandlerContext(Looper.getMainLooper().asHandler(async = true))
                                      override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
                                      override val loadPriority: Int
                                          get() = Int.MAX_VALUE / 2
                                  }
                                  

                                  根据createDispatcher分发,主线程分发器的实现类为HandlerContext类型,传入用MainLooper构建的Handler。详见2.7.3。

                                  2.7.3>
                                  internal class HandlerContext private constructor(
                                      private val handler: Handler,
                                      private val name: String?,
                                      private val invokeImmediately: Boolean
                                  ) : HandlerDispatcher(), Delay {
                                      /**
                                       * Creates [CoroutineDispatcher] for the given Android [handler].
                                       *
                                       * @param handler a handler.
                                       * @param name an optional name for debugging.
                                       */
                                      constructor(
                                          handler: Handler,
                                          name: String? = null
                                      ) : this(handler, name, false)
                                      @Volatile
                                      private var _immediate: HandlerContext? = if (invokeImmediately) this else null
                                      override val immediate: HandlerContext = _immediate ?:
                                          HandlerContext(handler, name, true).also { _immediate = it }
                                      override fun isDispatchNeeded(context: CoroutineContext): Boolean {
                                          return !invokeImmediately || Looper.myLooper() != handler.looper
                                      }
                                      override fun dispatch(context: CoroutineContext, block: Runnable) {
                                          if (!handler.post(block)) {
                                              cancelOnRejection(context, block)
                                          }
                                      }
                                      override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
                                          val block = Runnable {
                                              with(continuation) { resumeUndispatched(Unit) }
                                          }
                                          if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
                                              continuation.invokeOnCancellation { handler.removeCallbacks(block) }
                                          } else {
                                              cancelOnRejection(continuation.context, block)
                                          }
                                      }
                                     ...
                                  }
                                  

                                  HandlerContext继承于HandlerDispatcher,而他的dispatch方法,可以看到,就是将block丢到设置MainLooperhandler执行。所以续体将会在主线程执行状态机,达到切换到主线程执行协程的目的。

                                  以上就是Kotlin协程的线程调度示例详解的详细内容,更多关于Kotlin协程的线程调度的资料请关注易采站长站其它相关文章!