ikuo’s blog

育児やエンジニアリングについて

ソースからKotlinのcoroutineを完全に理解する

Disclaimer: タイトルは釣りです。読んでも理解できません。。

Server Side Kotlin はじめました。

f:id:martin_lover_se:20210919232224p:plain

Kotlinといえばcoroutineですね。公式ドキュメントがすごく良くできていて、ここを読めば使い方は大体理解できます。

なんですが使っているうちにだんだんこれどうなっとんねん、という気持ちが強くなってきたので、 コードを追い始めました。

思考の垂れ流しなので、ほとんど自分が後で見直すためのメモです。

環境は

  • org.jetbrains.kotlin:kotlin-stdlib:1.5.30
  • org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2
  • jdk correto-11.0.9

です。

coroutine を巡る旅へ

KotlinでいうCoroutineの実体って何?

ざっくりこのクラス図が頭に入っていると追いやすいです。

https://github.com/takahirom/kotlin-coroutines-class-diagram

だいたい、suspend functionの実行をContextとScopeで階層的に管理するやつ、位に思っとけば良いかなーという感想です。

coroutineの起点は CoroutineScope.launch/async ですが、この関数を開けてみると SnandaaloneCoroutineとDefferdCoroutineというやつを作っていることがわかります。

DefferdCoruotine(async)     --> AbstractCoroutine -> CoroutineScope
StandaloneCoroutine(launch) -┘

というような継承関係となっていて、CoroutineScope has a CoroutineContext です。 実行時の環境情報はCoroutineContextが保持しています。

なんですが、同じく

AbstractCoroutine -> Job -> Element -> CorutineContext

という継承関係もあって、Corutine自体がCoroutineConetxtでもある... という構造なので大変話しがややこしいです。

CoroutineScopeがCoroutineContextと結合可能だったりします。

public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
    ContextScope(coroutineContext + context)

これはおそらく根本的にはCoroutineはScopeで厳密に親子関係を管理したいという要求からきていて、おそらくkotlinのcoroutineにおける根本の設計思想の一つだと思うんですが、 この親子関係を表現するためのComposite-patternの変形のようなかんじなのかなーなどと思いつつ。

Deep Dive into kotlinx-coroutines-core

もう少し深掘っていきます。 launch は

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

となっていて、新しいContextを作ってCoroutineを起動するんですが、 CoroutineScopeでもありCoroutineContextでもあるStandaloneCoroutineをJobとして返すことでユーザーに開示するAPIを制限するんですね、 大変面白いです。

また、blockはsuspend functionとして定義されているのですが、このsuspend functionというやつが曲者であとで出てきます。

ここで余談ですが

ところで、

launch {
}

これは最後のパラメタが関数のとき、()の外にラムダを書けるというkotlinの機能によって成立していると思うんですが、suspendの記述がないですね。 自動的に変換されるのでしょうか? suspend lambda というやつらしいですね。

このlambdaを()の外に出せる機能は大変良いですね、死ぬほどコードの見通しが良くなります。 https://kotlinlang.org/docs/lambdas.html#passing-trailing-lambdas

あと, launch関数が拡張関数を用いてBuilders.common.ktにまとめられてるのも大変わかりやすいです。モジュール構造自体もたいへん勉強になりました。

本題に戻りましょう

更に遡ると、 CoroutineContextはkotlinx-coroutines-core-jvmではなくて、stdlib-common のほうに組み込まれていることに気が付きます。 suspend はkotlin予約語のビルドイン機能になっているので、 中断可能なデータ構造としての広義のcoroutineは言語ビルドインになっていて、それを階層的に扱うためのライブラリとしてkotlinx-coroutines-core-jvmが提供されている、という位置づけのようですね。

ではcoroutine.startが何をやっているかと言うと、 ここが引数の順番を変えてしまっていて少しトリッキーなんですが、

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    :
    coroutine.start(start, coroutine, block)
:
AbstractCoroutine.kt

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

block は launch で指定された処理、 receiver は launchで作成したStandaloneCoroutine、 そして this もまた、実行したCoroutineなのでStandaloneCoroutineです。

CoroutineStartはEnumなんですが、それをそのまま実行していて面食らいました。 これは kotlin の機能で、Enumに関数を実装できるうえに operator invoke で自身を実行できるようにしています。

    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

こうして指定されたblockを、生成したCoroutineをreceiverとして起動していることがわかります。また、completionもおなじCoroutineです。 ここで見慣れない Continuation というやつが出てくるのですが、これはAbstractCoroutineが継承しています。

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

Continuationの定義はこんな感じで、

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

contextとresumeWithをインターフェースとして持ちます。こいつは stdlib の方で定義されていますね。 CoroutineScopeも has a CoroutineContext なのでまた紛らわしいですが、このContinuationがここから先のキモになっています。

block.startCoroutineCancellable は, Cancellable.ktでsuspend funに生やしていて

Cancellable.kt

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

runSafelyはtry-catchでFailureをdispachするだけのものなので、実体はcreateCoroutineUninterceptedということになります。 で、このcreateCoroutineUniterceptedというのは、stdlibに定義された suspend fun の関数で

@kotlin.SinceKotlin public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(receiver: R, completion: kotlin.coroutines.Continuation<T>): kotlin.coroutines.Continuation<kotlin.Unit> { /* compiled code */ }

とこうなっているんですが、IntrinsicsKt.class ... classファイルなんですよねぇ。

さらに深淵へ

ちょっと行き詰まったので、ドキュメントを調べていきます。 https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md

Continuation passing style

Every suspending function and suspending lambda has an additional Continuation parameter that is implicitly passed to it when it is invoked.

hmhm, これが completion: Continuation で、まあ要するにCallbackですね。CallbackHellからasync/awaitスタイルの書き下しに変換する仕組みはこのあたりにありそうです。 ここまでのコードリーディングと合わせて、だいたいsuspend fun が状態を保持したまま処理をsuspend/resumeできる機構を備えていて、CoroutineScopeはそれをScope/階層化して管理しながら実行するもの、くらいのことがわかりました。

ググっているとデコンパイルして内容を確認してみよう、みたいな記事がいくつか出てくるので、やってみます。 今回はJava Decompiler を使いました。なんかjava11で起動するとエラーになりますね。java8で起動します。

さっき躓いたIntrinsicsKtを覗いてみます。

IntrinsicsKt extends IntrinsicsKt__IntrinsicsKt extends IntrinsicsKt__IntrinsicsJvmKt で、ほとんどの処理がこのJvmKtに書いてあります。

createCoroutineUninterceptedはこんな雰囲気で、

@SinceKotlin(version = "1.3")
  @NotNull
  public static final <R, T> Continuation<Unit> createCoroutineUnintercepted(@NotNull Function2 $this$createCoroutineUnintercepted, Object receiver, @NotNull Continuation completion) {
    :
    Continuation probeCompletion = DebugProbesKt.probeCoroutineCreated(completion);
    int $i$f$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt = 0;
    CoroutineContext context$iv = probeCompletion.getContext();
    :
    return ($this$createCoroutineUnintercepted instanceof BaseContinuationImpl) ?
    :
      (Continuation<Unit>)new IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$4(probeCompletion, context$iv, probeCompletion, context$iv, $this$createCoroutineUnintercepted, receiver));
  }

パラメータチェックのあと IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt というクソ長いクラスのインスタンスを作っていることがわかります。 この createCoroutineUnintercepted を呼び出す suspend fun, つまりもともとの block は、 $this$createCoroutineUnintercepted となっていて、ここでは単なるFunction2として渡されています。

このクソ長いクラスは ContinuationImpl を実装していて、ここで $super_call_param$3 - $this$createCoroutineUnintercepted - つまりもともとの block が ContinuationImpl の completion として新たな Continuation のインスタンスが生成されることがわかります。

  public static final class IntrinsicsKt__IntrinsicsJvmKt$createCoroutineFromSuspendFunction$2 extends ContinuationImpl {
    private int label;
    
    public IntrinsicsKt__IntrinsicsJvmKt$createCoroutineFromSuspendFunction$2(Function1 $captured_local_variable$0, Continuation $captured_local_variable$1, CoroutineContext $captured_local_variable$2, Continuation $super_call_param$3, CoroutineContext $super_call_param$4) {
      super($super_call_param$3, $super_call_param$4);
    }
  }

これでやっと suspend fun から Continuation が生成できました。

再びkotlinの世界へ

startCoroutineCancellableに戻ります。

createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

こうでした。 intercepted() は状態を見て自分を返すので、結局 resumeCancellabeWith が呼ばれます。 この関数はDispatchedContinuationでContinuationに生えていて

DispatchedContinuation.kt

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

resumeWithを呼びますが、いまContinuationはContinuationImplで ContinuationImpl -> BaseContinuationImpl ですから、ここのresumeWithが呼ばれて

    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        :
            }
        }
    }

と、invokeSuspend() を呼びます。

BaseContinuationImpl.kt
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

abstract 関数になってまして、じゃあこれ誰が実装するのというと、さっきのドキュメントのこのあたりの説明を読むとsuspend fun あるいは suspend lambdaを実装するとコンパイラが対応するinvokeSuspendを実装した匿名クラスを吐くらしいんですね。

ふたたび深淵へ

せっかくなのでかんたんなテスト用クラスを作ってこれも確認してみます。

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun say() {
    delay(100)
    print("World!")
}

fun main() {
    runBlocking {
        launch {
            print("Hello, ")
            say()
        }
    }
}

このコードはこんなクラスファイルを吐きます。

SuspendKt$main$1$1.class
SuspendKt$main$1.class
SuspendKt$say$1.class
SuspendKt.class

main$1が launch に渡した suspend lambda, say$1 が suspend fun say() ですかね。

  static final class SuspendKt$say$1 extends ContinuationImpl {
    int label;
    
    SuspendKt$say$1(Continuation $completion) {
      super($completion);
    }
    
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      this.result = $result;
      this.label |= Integer.MIN_VALUE;
      return SuspendKt.say((Continuation<? super Unit>)this);
    }
  }
  static final class SuspendKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Job>, Object> {
    int label;
    
    SuspendKt$main$1(Continuation $completion) {
      super(2, $completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      CoroutineScope $this$runBlocking;
      Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (this.label) {
        case 0:
          ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
          $this$runBlocking = (CoroutineScope)this.L$0;
          return BuildersKt.launch$default($this$runBlocking, null, null, new Function2<CoroutineScope, Continuation<? super Unit>, Object>(null) {
                int label;
                
                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                  String str;
                  boolean bool;
                  Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                  switch (this.label) {
                    case 0:
                      ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                      str = "Hello, ";
                      bool = false;
                      System.out.print(str);
                      this.label = 1;
:

こんな感じで、invokeSuspendが実装されたContinuationImplを吐いてくれたことがわかりました。 ※ SuspendLambda extends ContinuationImpl です。

また、invokeSuspendがstateMachineとして実装されている様子も垣間見えます。 当面、どのように実行されるかがわかって満足したのでこのあたりで。

Appendix