こんにちは!ヘンリーでソフトウェアエンジニアをしている @agatan です。
今日は小ネタで、サーバーサイド Java / Kotlin エコシステムで意外と使われている ThreadLocal と、それを Coroutine と安全に組み合わせる方法について紹介します!
TL; DR
ThreadContextElementを使おう!
ThreadLocal とは
java.lang.ThreadLocal<T>
は、その名の通り、スレッドローカルな(= スレッドごとに独立した値を持つ)変数を定義するための機構です。
ある Thread で値を書き換えたとしても、他の Thread から見た ThreadLocal 変数の中身は書き換わらない、という性質があります。
import kotlin.concurrent.thread
val tls: ThreadLocal<Int> = ThreadLocal.withInitial { -1 }
fun printTls() {
println("${Thread.currentThread().name}: ${tls.get()}")
}
fun main() {
val th1 = thread {
printTls()
tls.set(0)
printTls()
}
val th2 = thread {
printTls()
tls.set(1)
printTls()
}
th1.join()
th2.join()
printTls()
}
サーバーサイド Java / Kotlin エコシステムでの ThreadLocal
ThreadLocal は暗黙の状態であり、グローバル変数的な性質を持っています。スレッドローカルなので、データ競合こそ起きませんが、一般にグローバル変数は避けたいものですよね。
ところが、サーバーサイド Java / Kotlin エコシステムでは、この ThreadLocal が思ったより頻繁に登場しています。
gRPC-Java
gRPC には Context
という概念があります。リクエストごとのコンテキスト情報を保持する概念で、典型的なユースケースとして、認証情報を詰めたり OpenTelemetry の Trace ID の伝搬に使われたりします。
gRPC-Java での Context
は以下のようにして使います。
val current = Context.current()
val newCtx = current.withValue(key, value)
newCtx.run {
Context.current()
}
Context.current
を呼び出すと、現在のコンテキストを取得できます。上の例でいえば、引数として引き回したりしていないのに、 (1) の部分で newCtx
が取得できるのですが、それを実現する方法として ThreadLocal が内部で利用されています。
Exposed
Exposed は Jetbrains 社謹製の ORM です。以下のようなコードが書けます。
val db = Database.connect()
transaction(db) {
Users.selectAll().where { Users.id.eq(1) }.toList()
}
このコードでは、 Users.selectAll()
の部分で実際のデータベースアクセスが行われるのですが、データベースへのコネクションを握っているのは db
オブジェクトです。
明示的に引数として渡したりしていないのに、どうやってデータベースへのコネクションを取得するかというと、やっぱり ThreadLocal を使っています。(Spring と併用している場合など、ThreadLocal に直接依存しない機構も提供されていますが、Henry では Spring を使っていないので ThreadLocal に依存した使い方になっています。)
OpenTelemetry
opentelemetry-java には、現在の Span の情報を取得する方法として、以下のような API が生えています。
Span.current()
これも、いくつかのクラス(Context, ContextStorage, LazyContext など)を経て、最終的に ThreadLocal依存の実装 にたどり着きます。
このように、サーバーサイド Java / Kotlin でよく使われるインフラ的なフレームワークたちの内部では、ThreadLocal が頻繁に使われています。
これらのフレームワークでは共通して、ThreadLocalに依存する “ContextStorage” 的なクラスが提供されていますが、API としては ThreadLocal 非依存な Interface になっていて、実装を差し替えることも可能になっています。
しかし、引数として持ち回さずに “Context” っぽいものを伝搬する機能を提供しようと思うと、JVM では ThreadLocal に依存しないことは難しく、自前実装に差し替えるとしても ThreadLocal を回避するのは困難です。
Coroutine と ThreadLocal
Henry はサーバーサイド API を Kotlin を使って記述していますが、Kotlin には強力な並行処理の道具として “Coroutine” というものがあります。
Coroutine には、「ある一つのCoroutineの実行が複数のスレッドにまたがる可能性がある」という性質があります。これは、この記事の主題に大きく影響する性質です。
ある Coroutine (launch
, async
などで起動する一つの Coroutine) の処理が、そもそも別スレッドで開始される可能性があり、さらに処理の途中で別のスレッドに移動することもあるのです。
以下に具体的な挙動を示すサンプルを記載します。
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import java.util.concurrent.Executors
suspend fun f(tag: String) {
println("$tag @ ${Thread.currentThread().name} (before yield)")
yield()
println("$tag @ ${Thread.currentThread().name} (after yield)")
}
fun main() {
runBlocking(Executors.newFixedThreadPool(2).asCoroutineDispatcher()) {
for (n in 1..3) {
launch {
f("launch-$n")
}
}
}
}
このサンプルでは、2 threads のスレッドプールの上で 3 つの coroutine を起動しています。
それぞれの coroutine の中では、自分自身が動いている Thread の名前 (= Thread.currentThread().name
) と Coroutine の名前を 2 回 print していますが、1 回目と 2 回目のあいだで yield()
を挟んで処理を suspend させています。
実行結果は例えば以下のようになるはずです。(環境依存)
launch-1 @ pool-1-thread-2 (before yield)
launch-2 @ pool-1-thread-1 (before yield)
launch-3 @ pool-1-thread-2 (before yield)
launch-1 @ pool-1-thread-1 (after yield)
launch-2 @ pool-1-thread-2 (after yield)
launch-3 @ pool-1-thread-2 (after yield)
ここから次のことがわかります。
- すべての coroutine は、スレッドプール上のスレッドで動いており、main スレッドでは動いていない
- suspend の前後で別のスレッドに移動することがある
- 例えば launch-1 に相当する coroutine は、yield 前は pool-1-thread-2 で動いているが、yield 後は pool-1-thread-1 で動いている
Coroutine の動くスレッドが固定されないということは、ThreadLocal との併用がうまくいかないことを意味します。
ThreadLocal に値を set した後、処理が suspend して別スレッドにうつってしまった場合、さっき set した値を get することはできなくなります。
次に示すコードでは、Coroutine の中から ThreadLocal に値を set し、yield 前後で ThreadLocal の値を print しています。
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import java.util.concurrent.Executors
val tls = ThreadLocal<String?>()
suspend fun f(tag: String) {
println("$tag @ ${Thread.currentThread().name} (before yield): ${tls.get()}")
yield()
println("$tag @ ${Thread.currentThread().name} (after yield): ${tls.get()}")
}
fun main() {
runBlocking(Executors.newFixedThreadPool(2).asCoroutineDispatcher()) {
for (n in 1..3) {
launch {
tls.set("launch-$n")
f("launch-$n")
}
}
}
}
実行結果は以下のようになりました。
launch-1 @ pool-1-thread-2 (before yield): launch-1
launch-2 @ pool-1-thread-1 (before yield): launch-2
launch-3 @ pool-1-thread-2 (before yield): launch-3
launch-1 @ pool-1-thread-1 (after yield): launch-2
launch-2 @ pool-1-thread-2 (after yield): launch-3
launch-3 @ pool-1-thread-2 (after yield): launch-3
launch-1 に相当する coroutine に注目すると、yield 前は tls.get()
の結果が launch-1
になっていて期待通りですが、yield 後は tls.get() == "launch-2"
になってしまっています。
これは launch-1 に相当する coroutine を実行するスレッドが yield 前後で別のスレッドになっていることと、一つのスレッドで複数の coroutine (ここでは launch-2) が実行されていることが原因です。
というわけで、ThreadLocal を利用するコードと Coroutine は、何も考えずに併用するとバグる、ということが確認できました。
このままだと、gRPC のコンテキストにアクセスできなくなったり、意図せず Exposed のトランザクションが分離してしまったり、OpenTelemetry の Trace が繋がらなくなったりしてしまいます。
ThreadContextElement で Thread と Coroutine の仲を取り持つ
kotlinx.coroutine には ThreadContextElementというクラスが提供されています。これをつかうことで、「Thread と Coroutine のミスマッチを補完する」機会を得ることが出来ます。
先にコードを示します。以下のように記述することで、ThreadLocal と Coroutine を安全に併用することができるようになります。
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
val tls = ThreadLocal<String?>()
class ThreadLocalContext(val value: String?) : ThreadContextElement<String?> {
companion object Key : CoroutineContext.Key<ThreadLocalContext>
override val key: CoroutineContext.Key<*>
get() = Key
override fun updateThreadContext(context: CoroutineContext): String? {
val previous = tls.get()
tls.set(value)
return previous
}
override fun restoreThreadContext(context: CoroutineContext, oldState: String?) {
tls.set(oldState)
}
}
suspend fun f(tag: String) {
println("$tag @ ${Thread.currentThread().name} (before yield): ${tls.get()}")
yield()
println("$tag @ ${Thread.currentThread().name} (after yield): ${tls.get()}")
}
fun main() {
runBlocking(Executors.newFixedThreadPool(2).asCoroutineDispatcher()) {
for (n in 1..3) {
launch(ThreadLocalContext("launch-$n")) {
f("launch-$n")
}
}
}
}
肝は ThreadLocalContext
クラスです。さきほど紹介した ThreadContextElement
を継承したクラスです。これを launch
するときに Context として指定することで、実行結果が以下のように期待通りになります。
launch-1 @ pool-1-thread-2 (before yield): launch-1
launch-2 @ pool-1-thread-1 (before yield): launch-2
launch-3 @ pool-1-thread-2 (before yield): launch-3
launch-1 @ pool-1-thread-1 (after yield): launch-1
launch-2 @ pool-1-thread-2 (after yield): launch-2
launch-3 @ pool-1-thread-2 (after yield): launch-3
Coroutine の名前と ThreadLocal に格納された値の整合性が(スレッドをまたいでも)一貫していることがわかります。
肝となる ThreadLocalContext
の実装を再掲します。
class ThreadLocalContext(val value: String?) : ThreadContextElement<String?> {
companion object Key : CoroutineContext.Key<ThreadLocalContext>
override val key: CoroutineContext.Key<*>
get() = Key
override fun updateThreadContext(context: CoroutineContext): String? {
val previous = tls.get()
tls.set(value)
return previous
}
override fun restoreThreadContext(context: CoroutineContext, oldState: String?) {
tls.set(oldState)
}
}
Key
, key
に関しては、 ThreadContextElement
というよりはその更に親である CoroutineContext
を定義するときのボイラープレートみたいなものなので、ここでは無視します。
key
以外に2つのメソッドを override しており、これらが今回の主題です。
1つ目のメソッドである updateThreadContext
は、 「Coroutineの実行が開始・再開するときに、その Coroutine を実行しようとしているスレッド上で呼び出される hook」 です。
「その Coroutine を実行しようとしているスレッド上で呼び出される」というのが重要で、このメソッドの中で Thread.currentThread()
を呼んで取得できる Thread は、その Coroutine が次に suspend するまでの間の実行スレッドと一致します。
したがって、 updateThreadContext
の中で ThreadLocal の更新を行えば、Coroutine の実行時には必ず ThreadLocal の中身が期待通りになっていることが保証されます。
2つ目のメソッドである restoreThreadContext
は、さっきの逆で、 「Coroutine の実行が終了・中断するときに、その Coroutine を実行していたスレッド上で呼び出される hook」 です。
updateThreadContext
で Coroutine の実行前に現在のスレッドの状態を変更したあと、Coroutine から抜けるときにその状態を復元してあげることができます。
引数に渡される oldState
は、 updateThreadContext
の返り値です。
全体を通した流れとしては以下のようになります。
- スケジューラによって、あるスレッド X 上で Coroutine 1 を実行することが決まる
ThreadContextElement.updateThreadContext
がスレッド X 上で呼び出される
- スレッド X の現在の状態(ThreadLocal の中身など)を取り出す ☆
- Coroutine 1 を実行するために、スレッド X の状態を書き換える
- Coroutine 1 の実行が始まる
- Coroutine 1 の処理が suspend する
ThreadContextElement.restoreThreadContext
がスレッド X 上で呼び出される
- ☆ で取り出しておいた状態が引数にわたってくるので、それを元にスレッド X の状態を復元する
このように、どんなにスレッドが使い回されても、Coroutine の出入りのタイミングで状態を復元するので、安全にスレッドと Coroutine を組み合わせることができます。
(また Experimental ですが、子 Coroutine が作られるたびにコンテキストをコピーすることで独立性を更に高める CopyableThreadContextElement という API もあります。)
実例
さきほどサーバーサイド Java / Kotlin エコシステムに潜む ThreadLocal の例として Exposed , gRPC-Java, OpenTelemetry を挙げました。
実はこのうち Exposed, gRPC については、まさにいま紹介した ThreadContextElement
を使ったブリッジの機構が提供されています。
Exposed
Exposed には suspendTransactionAsync
や newSuspendedTransaction
, withSuspendedTransaction
といった API がはえており、これらを使うことで安全に Coroutine を使うことができるようになっています。(あまり目立たないのですが、公式ドキュメントに Coroutine についてのセクションがあります。JDBC 依存なので、同期的実行が前提になっており、Coroutine によるパフォーマンスゲインは限定的で、それもあってあまり大々的に Coroutine を使うことを想定していない印象です。)
これらの実装の内部を探っていくと、 ThreadContextElement
を継承したクラスが使われていることがわかります。(実装)
gRPC
gRPC については、gRPC-Java ではなく、gRPC-Kotlin からブリッジ機構が提供されています。(gRPC-Kotlin は gRPC-Java に依存しており、Context そのものは gRPC-Java の実装が使われています。)
GrpcContextElement
というクラスが提供されており、その実装は ThreadContextElement
をつかっています。
(Henry では gRPC-Kotlin をつかっていないので、自前でこれに相当する処理を記述する必要がありました。)
OpenTelemetry
OpenTelemetry については、僕の調べた限りはこの手のブリッジが存在しないので、手で書く必要があります。
こんな感じの ThreadContextElement
を定義すれば OK です。
class OTelSpanContext(private val span: Span) : ThreadContextElement<Scope> {
companion object Key : CoroutineContext.Key<OTelSpanContext>
override val key: CoroutineContext.Key<OTelSpanContext>
get() = Key
override fun updateThreadContext(context: CoroutineContext): Scope {
return span.makeCurrent()
}
override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
oldState.close()
}
}
実際に使う側では
tracer.spanBuilder("foo").startAndCall {
launch(Dispatchers.IO + OTelSpanContext(Span.current()) {
...
}
}
という感じで呼び出します。
余談: この問題は Coroutine 固有の問題なのか?
実は ThreadLocal に依存した “Context” 伝搬を正しく扱う難しさというのは、Coroutine 固有の問題ではありません。
普通に Java の Thread を使っていても、なにもケアしなければ容易に Context の連続性が失われます。
たとえば、Java で並行処理をする場合、典型的には java.util.concurrent.ExecutorService
を使うことが多いと思いますが、この場合も結局新しい Thread に処理が移るので、ThreadLocal の中身は引き継がれません。
InheritableThreadLocal は使えないか?
Thread の場合は Coroutine と違って、 java.lang.InheritableThreadLocal<T>
という道具が提供されています。これをつかうと、「ThreadLocal の初期値として親スレッドでの値を引き継ぐ」ということが可能になります。
import kotlin.concurrent.thread
val inheritableTls = InheritableThreadLocal<String?>()
val tls = ThreadLocal<String?>()
fun main() {
inheritableTls.set(Thread.currentThread().name)
tls.set(Thread.currentThread().name)
thread {
println("thread@${Thread.currentThread().name}: inheritable=${inheritableTls.get()}, normal=${tls.get()}")
}.join()
}
しかし、 InheritableThreadLocal
にはいくつかの問題があり、暗黙の Context 伝搬には使えません。
- そもそも gRPC-Java などのライブラリの内部で
InheritableThreadLocal
を使ってもらう必要がある
- スレッド作成時の親スレッドでの状態に依存するので、スレッドプールのように一度用意したスレッドを使い回すタイプの処理に対応できない
スレッドなら普通に初期化と終了処理を手書きすればいいのでは?
スレッドの場合は Coroutine と違って実行スレッドがぴょんぴょん飛び回ったりしませんから、処理の先頭と末尾で初期化・終了処理を手書きすれば問題なく動きます。
try-with-resources や Closable.use
を使えば、安全かつそれなりに手軽に初期化・終了処理を記述できます。
実際、それで十分なケースは多いかと思います。が、せっかく Kotlin を使っている以上、Coroutine は(安全に正しく使えるなら)積極的に使うべきだと思います。
Coroutine はスレッドより効率が良いだけでなく、kotlinx.coroutine が提供する Structured Concurrency のための仕組みは、普通にプログラミングをするにあたって便利な機能を備えています。(キャンセルとか Deferred とか Context とか Dispatchers とか)
Henry ではまだまだ Coroutine を自ら使い倒すことはできていませんが、Ktor が Coroutine を使った API を提供しており、外部サービスの公式 API Client ライブラリが Ktor に依存しているなど、間接的に Coroutine が登場するシーンもあるため、Coroutine から逃げ切ることは難しくなっています。
また、どうせ初期化・終了処理を記述するなら、kotlinx.coroutine の提供する仕組みに乗っかれるほうが readability / maintainability の観点からも有利です。こういうのは自前の仕組みを作るより、すでにある仕組みに乗っかったほうが、ドキュメンテーションのコストを節約できたり、新しく入ってきた開発者からも見通しがよかったりと嬉しい事が多いです。
Javaにも Virtual Threads が登場して、少し事情が変わってきそうな見込みもありますが、Kotlin をつかう限り Coroutine に賭けておいて損はないんじゃないかというのが僕のいまの見解です。
まとめ
この記事では
- ThreadLocal はスレッドごとに独立した値をもつグローバル変数を定義できる機能
- サーバーサイド Java / Kotlin のエコシステムでは、ThreadLocal を利用して暗黙の Context 伝搬を実現しているケースがある
- Coroutine は複数のスレッドにまたがって実行されるため、ThreadLocal と組み合わせるとバグる
- ThreadContextElement を使うことで Coroutine とスレッドの状態を同期できる
- エコシステム側で提供されていることもあるし、手書きでも簡単に書けるので便利
ということを紹介しました。
Coroutine は結構 API が充実していてふつうに便利なうえに、パフォーマンスも良くなりやすいので、積極的に使っていきたいですね!