Kotlin协程launch启动流程原理详解

2022-12-09 09:43:36
目录
1.launch启动流程反编译后的Java代码2.协程是如何被启动的

1.launch启动流程

已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的呢,直接进入launch的源码开始看起。

fun main() {
    coroutineTest()
    Thread.sleep(2000L)
}
val block = suspend {
    println("Hello")
    delay(1000L)
    println("Kotlin")
}
private fun coroutineTest() {
    CoroutineScope(Job()).launch {
        withContext(Dispatchers.IO) {
            block.invoke()
        }
    }
}

反编译后的Java代码

public final class CoroutineDemoKt {
   @NotNull
   private static final Function1 block;
   public static final void main() {
      coroutineTest();
      Thread.sleep(2000L);
   }
   // $FF: synthetic method
   public static void main(String[] var0) {
      main();
   }
   @NotNull
   public static final Function1 getBlock() {
      return block;
   }
   private static final void coroutineTest() {
      BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
               Function2 var10001 = (Function2)(new Function2((Continuation)null) {
                  int label;
                  @Nullable
                  public final Object invokeSuspend(@NotNull Object $result) {
                     Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                     switch(this.label) {
                     case 0:
                        ResultKt.throwOnFailure($result);
                        Function1 var10000 = CoroutineDemoKt.getBlock();
                        this.label = 1;
                        if (var10000.invoke(this) == var2) {
                           return var2;
                        }
                        break;
                     case 1:
                        ResultKt.throwOnFailure($result);
                        break;
                     default:
                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                     }
                     return Unit.INSTANCE;
                  }
                  @NotNull
                  public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                     Intrinsics.checkNotNullParameter(completion, "completion");
                     Function2 var3 = new <anonymous constructor>(completion);
                     return var3;
                  }
                  public final Object invoke(Object var1, Object var2) {
                     return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                  }
               });
               this.label = 1;
               if (BuildersKt.withContext(var10000, var10001, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 3, (Object)null);
   }
   static {
      Function1 var0 = (Function1)(new Function1((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            String var2;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               var2 = "Hello";
               System.out.println(var2);
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var3) {
                  return var3;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            var2 = "Kotlin";
            System.out.println(var2);
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         }
         public final Object invoke(Object var1) {
            return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
         }
      });
      block = var0;
   }
}

先分析一下上面代码的流程:

    首先声明了一个Function1类型的block变量,这个变量就是demo中的block,然后会在static函数中会被赋值。接下来就是coroutineTest函数的调用。这个函数中的第一行代码就是CoroutineScope的传参和一些默认值然后通过89行的invoke进入到了外层状态机流转的过程95行的static表示的是内部的挂起函数就是demo中的block.invoke,它是以匿名内部类的方式实现,然后执行内部的状态机流转过程,最后给block赋值。block被赋值后最终在Function1 var10000 = CoroutineDemoKt.getBlock();被调用

    那么这个过程又是如何实现的,进入launch源码进行查看:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    这里的block指的就是demo中的block代码段

    再来看一下里面的几行代码的含义:

      newCoroutineContext: 通过默认的或者传入的context创建一个新的Context;coroutine: launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。coroutine.start: 尝试启动协程

      2.协程是如何被启动的

      通过launch的源码可知协程的启动是通过coroutine.start启动的,那么协程的启动流程又是怎样的?

      public abstract class AbstractCoroutine<in T>(
          parentContext: CoroutineContext,
          initParentJob: Boolean,
          active: Boolean
      ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
          ...
          /**
           * 用给定的代码块启动这个协程并启动策略。这个函数在这个协程上最多调用一次。
           */
          public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
              start(block, receiver, this)
          }
      }
      

      start函数中传入了三个参数,只需要关注第一个参数即可。

      public enum class CoroutineStart {
          ...
          /**
           * 用这个协程的启动策略启动相应的块作为协程。
           */
          public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
          when (this) {
              DEFAULT -> block.startCoroutineCancellable(completion)
              ATOMIC -> block.startCoroutine(completion)
              UNDISPATCHED -> block.startCoroutineUndispatched(completion)
              LAZY -> Unit // will start lazily
          }
      }
      

      启动策略的具体实现有三种方式,这里只需要分析startCoroutine,另外两个其实就是它的基础上增加了一些功能,其中前者代表启动协程以后可以在等待调度时取消,后者表示协程启动后不会被分发。

      /**
       * 创建没有接收方且结果类型为T的协程,这个函数每次调用时都会创建一个新的可挂起的实例。
       */ 
      public fun <T> (suspend () -> T).startCoroutine(
          completion: Continuation<T>
      ) {
          createCoroutineUnintercepted(completion).intercepted().resume(Unit)
      }
      

      createCoroutineUnintercepted在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。

      //IntrinsicsJvm.kt#createCoroutineUnintercepted
      /**
       * 创建没有接收方且结果类型为T的非拦截协程。这个函数每次调用时都会创建一个新的可挂起的实例。
       */
      public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
          completion: Continuation<T>
      ): Continuation<Unit> {
          val probeCompletion = probeCoroutineCreated(completion)
          return if (this is BaseContinuationImpl)
              create(probeCompletion)
          else
              createCoroutineFromSuspendFunction(probeCompletion) {
                  (this as Function1<Continuation<T>, Any?>).invoke(it)
              }
      }
      

      actual代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。

      createCoroutineUnintercepted是一个扩展函数,接收者类型是一个无参数,返回值为 T 的挂起函数或者 Lambda。

      第9行代码中的this代表的是(suspend () -> T)也就是invoke函数中的block变量,这个block变量就是demo中的block代码段。

      第9行的BaseContinuationImpl是一个抽象类它实现了Continuation

      关于if (this is BaseContinuationImpl)的结果暂且不分析,先分析两种情况下的create函数:

        create(probeCompletion):
        //ContinuationImpl.kt#create
        public open fun create(completion: Continuation<*>): Continuation<Unit> {
            throw UnsupportedOperationException("create(Continuation) has not been overridden")
        }
        public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
            throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
        }
        

        这个create函数抛出一个异常,意思就是这个create()没有被重写,而这个create()的重写就是在反编译后的Java代码中的create函数

        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
        }
        
          createCoroutineFromSuspendFunction(probeCompletion):
          //IntrinsicsJvm.kt#createCoroutineFromSuspendFunction
          /**
           * 当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。
           *
           * 它发生在两种情况下:
           * 1.lambda表达式中调用了其他的挂起方法
           * 2.挂起方法是通过Java实现的
           *
           * 必须将它封装到一个扩展[BaseContinuationImpl]的实例中,因为这是所有协程机制的期望。 
           */
          private inline fun <T> createCoroutineFromSuspendFunction(
          	completion: Continuation<T>,
          	crossinline block: (Continuation<T>) -> Any?
          		): Continuation<Unit> {
          	val context = completion.context
          	// context为空创建一个受限协程
          	return if (context === EmptyCoroutineContext)
          	//受限协程:只能调用协程作用域中提供的挂起方式挂起,其他挂起方法不能调用
          	object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
          		private var label = 0
          		override fun invokeSuspend(result: Result<Any?>): Any? =
          		when (label) {
          			0 -> {
          				label = 1
          				result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
          				block(this) // 运行块,可以返回或挂起
          			}
          			1 -> {
          				label = 2
          				result.getOrThrow() // 这是block挂起的结果
          			}
          			else -> error("This coroutine had already completed")
          		}
          	}
          	else
          	//创建一个正常的协程
          	object : ContinuationImpl(completion as Continuation<Any?>, context) {
          		private var label = 0
          		override fun invokeSuspend(result: Result<Any?>): Any? =
          		when (label) {
          			0 -> {
          				label = 1
          				result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获)
          				block(this) // 运行块,可以返回或挂起
          			}
          			1 -> {
          				label = 2
          				result.getOrThrow() // 这是block挂起的结果
          			}
          			else -> error("This coroutine had already completed")
          		}
          	}
          }
          

          createCoroutineFromSuspendFunction就是当一个被suspend修饰的Lambda表达式没有继承BaseContinuationImpl是才会被调用,然后根据上下文是否为空创建不同类型的协程。

          两种情况都已经分析完了,那么现在if (this is BaseContinuationImpl)会执行哪一个呢,首先这里的this所指的就是demo中的block代码段,Kotlin编译器编译后会自动生成一个类就是上面的static,它会继承SuspendLambda类,而这个SuspendLambda类继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,因此可以得到判断结果为true,

          createCoroutineUnintercepted的过程就是协程创建的过程。

          然后就是intercepted函数,这个函数的具体实现也在IntrinsicsJvm.kt中,那么intercepted又做了什么呢

          public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
          //具体实现
          //IntrinsicsJvm.kt#intercepted
          public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
              (this as? ContinuationImpl)?.intercepted() ?: this
          

          首先有个强转,通过上面的分析这个强转是一定会成功的,到这里intercepted就进入到了ContinuationImpl中了

          internal abstract class ContinuationImpl(
              completion: Continuation<Any?>?,
              private val _context: CoroutineContext?
          ) : BaseContinuationImpl(completion) {
          	...
              @Transient
              private var intercepted: Continuation<Any?>? = null
          	//如果没有缓存,则从上下文获取拦截器,调用interceptContinuation进行拦截
          	//将获取到的内容保存到全局变量
              public fun intercepted(): Continuation<Any?> =
                  intercepted
                      ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                          .also { intercepted = it }
          }
          

          这里的ContinuationInterceptor指的就是Demo中传输的Dispatcher.IO,默认值时Dispatcher.Default

          再回到startContinue中还剩最后一个resume

          /**
           * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
           */
          public inline fun <T> Continuation<T>.resume(value: T): Unit =
          	resumeWith(Result.success(value))
          public interface Continuation<in T> {
          	/**
               * 与此延续相对应的协程的上下文。
               */
          	public val context: CoroutineContext
          	/**
               * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。
               */
          	public fun resumeWith(result: Result<T>)
          }
          

          这里的resume(Unit)作用就相当与启动了一个协程。

          上面的启动流程中为了方便分析的是CoroutineStart.ATOMIC,而默认的是CoroutineStart.DEFAULT,下面分析一下DEFAULT的流程

          //Cancellable.kt#startCoroutineCancellable
          public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
              createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
          }
          

          startCoroutineCancellable对于协程的创建和拦截与ATOMIC是一样的,区别就在于resumeCancellableWith

          //DispatchedContinuation#resumeCancellableWith
          public fun <T> Continuation<T>.resumeCancellableWith(
          	result: Result<T>,
          	onCancellation: ((cause: Throwable) -> Unit)? = null
          ): Unit = when (this) {
          	is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
          	else -> resumeWith(result)
          }
          // 我们内联它来保存堆栈上的一个条目,在它显示的情况下(无限制调度程序)
          // 它只在Continuation<T>.resumeCancellableWith中使用
          @Suppress("NOTHING_TO_INLINE")
          inline fun resumeCancellableWith(
          	result: Result<T>,
          	noinline onCancellation: ((cause: Throwable) -> Unit)?
          		) {
          	val state = result.toState(onCancellation)
          	//是否需要分发
          	if (dispatcher.isDispatchNeeded(context)) {
          		_state = state
          		resumeMode = MODE_CANCELLABLE
          		//将可运行块的执行分派给给定上下文中的另一个线程
          		dispatcher.dispatch(context, this)
          	} else {
          		executeUnconfined(state, MODE_CANCELLABLE) {
          			//协程未被取消
          			if (!resumeCancelled(state)) {
          				// 恢复执行
          				resumeUndispatchedWith(result)
          			}
          		}
          	}
          }
          //恢复执行前判断协程是否已经取消执行
          inline fun resumeCancelled(state: Any?): Boolean {
          	//获取当前协程任务
          	val job = context[Job]
          	//如果不为空且不活跃
          	if (job != null && !job.isActive) {
          		val cause = job.getCancellationException()
          		cancelCompletedResult(state, cause)
          		//抛出异常
          		resumeWithException(cause)
          		return true
          	}
          	return false
          }
          //我们需要内联它来在堆栈中保存一个条目
          inline fun resumeUndispatchedWith(result: Result<T>) {
          	withContinuationContext(continuation, countOrElement) {
          		continuation.resumeWith(result)
          	}
          }

          以上就是Kotlin协程launch启动流程原理详解的详细内容,更多关于Kotlin协程launch启动流程的资料请关注易采站长站其它相关文章!