目录引言一、协程的分发器作用1.1测试代码1.2CoroutineScope.launch1.2.1newCoroutineContext1.3startCoroutineCancellabl...
目录
引言一、协程的分发器作用
1.1 测试代码
1.2 CoroutineScope.launch
1.2.1 newCoroutineContext
1.3 startCoroutineCancellable
1.3.1 intercepted()
1.3.2 CoroutineDispatcher
1.3.3 小结
1.4 DispatchedContinuation
1.5 DefaultScheduler
1.5.1 SchedulerCoroutineDispatcher
1.5.2 CoroutineScheduler
1.6 DispatchedTask.run()
1.7 总结
二、协程中的线程切换
2.1 反编译代码
2.1.1 MainActivityonCreateonCreateonCreate1
2.1.2 AnonymousClass1
2.2 withContext
2.2.1 startCoroutineCancellable
2.3 resumeWith
2.4 DispatchedCoroutine
2.4.1 DispatchedCoroutine 的继承关系
2.5 协程线程的恢复
2.5.1 AbstractCoroutine.resumeWith()
2.5.2 afterResume
2.6 总结
2.7 Dispatchers.Main
2.7.1 MainDispatcherLoader
2.7.2 androidDispatcherFactory
2.7.3 HandlerContext
引言
在第一篇文章中我们分析了协程启动创建过程启动过程,在本文中,我们将着重剖析协程中协程调度的逻辑流程。主要是分析解答如下2个问题:
涉及到协程方法器是如何将协程代码调度到特定的线程执行?子协程执行完又是如何切换0回父协程的线程环境?
一、协程的分发器作用
1.1 测试代码
GlobalScope.launch {
//协程体1
Log.d(TAG, "before suspend job.")
withContext(Dispatchers.Main) {
//协程体2
Log.d(TAG, "print in Main thread.")
}
Log.d(TAG, "after suspend job.")
}
此次的协程测试用例中,我们默认的launch一个协程,我们简单的将launch需要执行的这外层逻辑为协程体1。在协程体1中,我们使用
withContext将协程切换到主线程执行,打印日志。我们将这里面执行的协程逻辑为协程体2。协程体2执行完成后,切回协程体1中执行并打印Log。
注意,根据我们之前《协程的创建与启动》文章中分析的,Kotlin编译器针对协程体1和协程体2分别生成一个继承与
SuspenLamabda的类型,比如:class MainActivity#onCreate$1 : SuspenLambda{...}。我们在讲协程体时,也同时代指这个类实例。继续跟踪launch()函数执行逻辑,这次跟踪过程不同与《协程的创建与启动》篇章,我们会将侧重点放在启动过程中协程调度器是如何起作用的?接下来见1.2
1.2 CoroutineScope.launch
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//1. 见1.2.1
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//2. 详见1.3
coroutine.start(start, coroutine, block)
return coroutine
}
这里会新建一个CoroutineContext,详见1.2.1根据之前的分析,这个里最终会调用到
startCoroutineCancellable()方法,详见1.3流程。1.2.1 newCoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return
if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default
else
debug
}
coroutineContext:coroutineContext是CoroutineScope的成员变量,当此时为GlobalScope.coroutineContext==EmptyCoroutineContext
context:由于调用launch时没有指定Context,所以传到此处也是EmptyCoroutineContext。foldCopies()函数将2个context相加并拷贝,最终combied==EmptyCoroutineContext。
而在return这最后判断返回的是debug+Dispatchers.Defatult,所以此时默认的分发器为Dispatchers.Defatult。
这里涉及到的协程Context运算不做深入剖析,简单可以认为协程重写了“+”运算,使得Context之间可以使用“+”来叠加,没有的Element类型会被添加到Element集合,集合中已有的Element类型会被覆盖。
1.3 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
//1. 创建SuspendLambda协程体
createCoroutineUnintercepted(receiver, completion)
//2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
.intercepted()
//3. 调用方法器Continuation的resume方法,详见1.4
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
这里的构建协程体在《协程的创建与启动》一节中已经剖析,不再赘述。进行拦截,注意:这里其实会根据方法器再构建出一个
DispatchedContinuation对象,它也是一个续体类型,这是对协程体的一次包装。详见1.3.1小节。调用拦截器续体的
resumeCancellableWith()开始状态机流转,执行分发流程详见1.4小节。1.3.1 intercepted()
public fun intercepted(): Continuation<Any?> =
intercepted?: (
//1. 取出拦截器
context[ContinuationInterceptor]?
//2.构建拦截器续体
.interceptContinuation(this)?: this)
.also { intercepted = it }
取出当前上下文中的拦截器类型,根据之前1.2.1小节的分析,这里取出来的是Dispatchers.Defatult。interceptContinuation(this)为构建拦截器续体,注意这里传入的this是协程体1。 详见1.3.2。1.3.2 CoroutineDispatcher
//Base class to be extended by all coroutine dispatcher implementations.
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>):
//详见1.4
Continuation<T> = DispatchedContinuation(this, continuation)
}
直接新建了一个DispatchedContinuation对象实例这里需要注意传入的构建参数:
Dispatcher,也就是Dispatchers.Defatult。continuation:协程体1。
1.3.3 小结
自此Continuation.intercepted()方法就分析结束,最终的结果是:用上下文中的Dispatcher和当前Contination对象也就是协程体1,共同作为构建参数,新建了一个DispatchedContinuation对象。
接下来接着1.3中的第三点,调用DispatchedContinuation.resumeCancellableWith()方法开始分析。
1.4 DispatchedContinuation
internal class DispatchedContinuation<in T>(
//1. 分发器
@JvmField val dispatcher: CoroutineDispatcher,
//2. 注意这里将Continuation的实现委托给了continuation成员变量。
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED)
, CoroutineStackFrame,
Continuation<T> by continuation {
//3. 复写属性delegate为自己
override val delegate: Continuation<T>
get() = this
...
// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//默认为true
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//4. 详细见
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedwith(result)
}
}
}
}
}
这里的dispatcher==Dispatchers.Defatult,所以接下来需要解析Dispatchers.Defatult到底是什么东西。详见1.5
dispatcher==Dispatchers.Default。成员变量
continucation==协程体1(SuspenLambda类型实例)。同时DispatchedContinuation继承于Continuation接口,它将Continuation接口的实现委托给了成员变量continuation。deleagte为复写了DispatchedTask.delegate属性,将其返回自己。调用分发器也就是
Dispatchers.Defatult的dispatch()方法,注意这里传入的参数:context:来自Continuation接口的属性,由于委托给了成员变量continuation,所以此context==continuation.context。
this:分发器本身Dispatchers.Defatult
自此这个方法的分析结束:调用分发器的进行分发,接下来分析就开始分析协程方法器CoroutineDispatcher
1.5 DefaultScheduler
//Dispathcer.kt
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
//Dispathcer.kt
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}
实际上是继承 SchedulerCoroutineDispatcher类型。详见1.5.1
1.5.1 SchedulerCoroutineDispatcher
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
//1. 详见1.5.2
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
//2. 详见1.5.2
override fun dispatch(context: CoroutineContext, block: Runnable): Unit
= coroutineScheduler.dispatch(block)
...
}
//Executors.kt
//2. 实际上是继承ExecutorCoroutineDispatcher
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
...
}
可以看到实际上调用了CoroutineScheduler.dispatch方法。此时发现,第二个参数是Runnable类型的,而在1.4小节中,我们知道传入的是this也就是DispatchedContinuation,所以DispatchedContinuation继承的父类中,必定有继承了Runnable接口,而他的run方法的实现也在父类中,这块我们暂时按下不表,接着看继续跟踪coroutineScheduler.dispatch(block)。1.5.2 CoroutineScheduler
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
...
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
该类继承了Executor类,而且它的构建参数可看到是线程池的参数,所以可以知道这个其实是Kotlin协程实现的一个线程池,具体就不跟进去了。execute()过程也是dispatch过程:将任务投递到任务队列,然后通知线程去取任务执行,自此完成了线程切换动作。而在新线程里执行的
Runnable为1.4中的调用代码:dispatcher.dispatch(context, this)中的this,也就是DispatchedContinuation。DispatchedContinuation.kt并没有实现run方法,那么一定是他继承的父类实现了Runnable接口并实现,所以需要接着看它继承的父类:DispatchedTask类。1.6 DispatchedTask.run()
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
...
internal abstract val delegate: Continuation<T>
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
internal open fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
//1. 取出代理商的续体
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
//1. 被包装的续体的resume方法,真正的开始出发其协程状态机代码。
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
将delegate转为DispatchedContinuation,应该注意1.4 小节中DispatchedContinuation继承Dispatchtask时,便对此delegate进行了复写:override val delegate: Continuation
get() = this
而此delegate.continucation便是当初newDispatchedContinuation(this)时传入的this,此this就是Kotlin编译器一开始为协程体生成的SuspendLambda类型对象。具体可以回看1.3小节。
continuation.resume()方法触发了协程的状态机进而开始执行协程业务逻辑代码,结合之前1.5.2的分析可以知道,这个方法的调用已经是被dispatch到特定线程,完成线程切换后执行的。所以协程状态机的代码也是跑在新线程上的。1.7 总结
至此,协程的线程调度分析结束,关键有如下几个要点:
创建SuspendLambda时,他的协程上下文对象来自于comletion.context,默认就是Dispatcher.Default。SuspendLambda启动时调用了intercept()进行一层包装,得到DispatchedContinuation,后续协程启动是启动的DispatchedContinuation协程。DispatchedContinuation继承于Runnable接口,协程启动时将自己投递到分发器dispatcher执行run方法,从而达到了线程切换效果。在
DispatchedContinuation的run方法中,调用SuspendLambda.resume()启动状态机。在新线程执行协程状态机代码。这一小节中,介绍了如何将协程调度到目的线程执行,接下来分析如何做到随意切换线程后,然后再恢复到原来线程的。
二、协程中的线程切换
在第一小节中,我们搞清楚了协程启动时,协程调度器是如何在其中起作用的。这一小节旨在剖析在协程用分发器切换线程执行新的挂起函数后,是如何切换会原来线程继续执行剩下的逻辑的。
为此,我们需要将1.1的测试代码反编译出来实际代码进而分析。
2.1 反编译代码
2.1.1 MainActivityonCreateonCreateonCreate1
final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
...
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04());
this.label = 1;
//1. 新建编译器自动生成的继承于SuspendLambda的类型。
AnonymousClass1 anonymousClass1 = new AnonymousClass1(null);
//2. 调用withContext
Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this);
if (res != coroutine_suspended) {
break;
} else {
//挂起
return coroutine_suspended;
}
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328());
return Unit.INSTANCE;
}
}
根据之前的文章分析,这里suspend lambda 的类型都自动生成继承于SuspendLambda的类型。详见2.1.2。
将anonymousClass1传入withContext,而且注意这里传入了this==MainActivity$onCreate$1,详见2.2。
2.1.2 AnonymousClass1
/* compiled from: MainActivity.kt */
public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> {
int label
...
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object obj) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(obj);
return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f()));
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
2.2 withContext
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
//1. 获取当前协程, 注意这里的uCont就是当前续体,也就是MainActivity$onCreate$1
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
//2. 计算获的新的协程上下文
val oldContext = uCont.context
val newContext = oldContext + context
//3. 快速判断:新上下文和旧上下文一致的情况快速处理。
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
//4. 新建一个DispatchedCoroutine
val coroutine = DispatchedCoroutine(newContext, uCont)
//5. 启动协程
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
suspendCoroutineUninterceptedOrReturn这个函数直接步进是看不到实现的,它的实现是由Kotlin编译器生成的,它的作用是用来获取当前续体的,并且通过uCont返回,这里就是MainActivity$onCreate$1。将旧协程上下文和新的上下文一起。计算得到最终的上下文。这里的
context==Dispatchers.getIO()。快速判断,不用看。
新建一个
DispatchedCoroutine,注意这里传入了新的协程上下文和当前续体对象。调用
startCoroutineCancellable()启动协程。这里的同1.3.2小节分析一样,详见 2.2.12.2.1 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
//1. 创建SuspendLambda协程体
createCoroutineUnintercepted(receiver, completion)
//2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1
.intercepted()
//3. 调用方法器Continuation的resume方法,详见1.4
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
此方法在之前1.3小节已经分析过,针对此此次调用,其中的改变是协程上下文中的分发器已经被设置为Dispatchers.Main。
SuspendLambda对象,此对象的CoroutineContext为completion.context。而其中的ContinuationInterceptor类型Element就是我们之前传入的Dispatchers.Main。创建一个
DispatchedContinuation。将协程
SuspendLambda的状态机逻辑通过Dispatcher.Main调度到主线程执行,调度过程参考第一下节。分发逻辑详见2.7小节。当
SuspendLambda的状态机invokeSuspend()逻辑执行完成后,会返回到BaseContinuationImpl.resumeWith(),我们需要接此方法分析,来得到协程在切换到主线程执行后,又是怎么切回协程体1的执行线程的,详见2.3。2.3 resumeWith
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state MAChine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
//1. 进入此判断
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
当状态机执行完后, 后进入到completion的类型判断,由2.2和2.2.1可以知道,当初传入的completion是DispatchedCoroutine类型,所以加入到else分支,调用了DispatchedCoroutine.resumeWith(),接下来分析此方法。
在此之前,我们需要看下DispatchedCoroutine的继承关系,详见2.4.1。如果想直接跟踪流程,可以直接看2.4.2。
2.4 DispatchedCoroutine
2.4.1 DispatchedCoroutine 的继承关系
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
}
继承于ScopeCoroutine
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
}
继承于AbstractCoroutine
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
}
2.5 协程线程的恢复
2.5.1 AbstractCoroutine.resumeWith()
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
调用了afterResume方法,此方法在DispatchedCoroutine类型有具体实现。见2.5.2
2.5.2 afterResume
//DispatchedCoroutine
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
取出当前续体uCont,这个续体根据之前的分析:2.2小节,可以知道它等于MainActivity$onCreate$1。intercepted():取出其分发拦截器resumeCancellableWith:使用方法拦截器协程体,将uCont续体的状态机逻辑调度到相对应的线程环境执行,这里就是之前的Dispatcher.Default。注意其注释:“将其切换到原先的分发器”。2⃣而这一过程其实和1.3小节的过程一致。恢复到
Dispatcher.Default继续执行状态机时,由于label已经被更新,所以会往下继续执行,打印最后一句log。2.6 总结
withContext(Dispatcher.Main)启动的协程时,取得当前协程续体uCount也就是MainActivity$onCreate$1,会计算出新的协程context,然后用它们创建一个DispatchedCoroutine。
AnonymousClass1协程启动时,用DispatchedCoroutine作为completion参数,然后启动,此时会调度主线程执行协程。
当协程执行完成后,AnonymousClass1.resumeWith()方法会调用completion.resumeWith()。
DispatchedCoroutine.resumeWith()方法会调用uCount.intercepted().resumeCancellableWith(),使得父协程进行调度并接着执行状态机逻辑。
2.7 Dispatchers.Main
@JvmStatic
public actual val Main: MainCoroutineDispatcher get()
= MainDispatcherLoader.dispatcher
直接详见2.7.1
2.7.1 MainDispatcherLoader
internal object MainDispatcherLoader {
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.Java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
// 1.获得MainDispatcherFactory的实现类
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
通过ServiceLoad机制获取MainDispatcherFactory的实现类,而在源码里面,其实现类为AndroidDispatcherFactory调用
tryCreateDispatcher()创建分发器,详见2.7.2。2.7.2 AndroidDispatcherFactory
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
根据createDispatcher分发,主线程分发器的实现类为HandlerContext类型,传入用MainLooper构建的Handler。详见2.7.3。
2.7.3 HandlerContext
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
...
}
HandlerContext继承于HandlerDispatcher,而他的dispatch方法,可以看到,就是将block丢到设置MainLooper的handler执行。所以续体将会在主线程执行状态机,达到切换到主线程执行协程的目的。
以上就是Kotlin协程的线程调度示例详解的详细内容,更多关于Kotlin协程的线程调度的资料请关注我们其它相关文章!










