White Box技術部

WEB開発のあれこれ(と何か)

【kotlin】Coroutinesを使って、Spring WebFluxでJDBC処理を行う

ちょっと記事を寝かせすぎてしまったのですが、今回は1.3でKotlin本体に入ることが決まったコルーチンのお話です。


WebFluxでJDBCを使うには

以前の発表資料にも記載したのですが、

Spring Boot 2から使えるSpring WebFluxは、Spring MVCと違い、ノンブロッキングなWebフレームワークなので、 ブロッキング処理として作られているJDBCを使うには一工夫必要です。

Schedulers#elasticで対応

解決策としては、(この言い回しで合っているのかは自信がないのですが)JDBC処理をSchedulerのelasticでサブスクライブさせ、適時Workerに処理して貰うようにすることで、処理不整合を起こすことなく動かすことができます。

実際、プロダクトでWebFluxを使ったときはこの方法で実装し、アプリケーションに対しGatlingで負荷をかけても、この部分ではKOにならなかったため、期待した動作をしているようでした。

この方法の問題点

しかしこの方法で実装すると、Handlerで応答レスポンスを作るまでは、データをMonoかFluxで持ち回る必要が出てきます。

戻り値の型がMonoなどになっていると、メソッドの使い回しがしづらく、呼び出す際もブロッキングを考慮しないといけないため、そこそこの実装難易度となってしまい、結果、kotlinを使っていても理解しづらいコードになってしまいました。

これがどうにも気に入らずもやもやしていたのですが、JJUGのナイトセミナーでコルーチンの存在を知り、「コルーチンを使えば良い感じに書けるのでは?」と光明を見出し、コルーチンを使った実装を試してみました。

コルーチン

コルーチンは軽量スレッドとよく説明されますが、私はスレッド内でJavaScriptの非同期処理を実現する仕組みと理解しています。なのでブロッキング処理を非同期処理として実行するのにコルーチンが使えるのであれば、これで先ほどの問題を解決できるのではないかと考えた訳です。

その他の説明としては、ここの中断可能な計算インスタンスという説明がイメージしやすかったです
https://qiita.com/k-kagurazaka@github/items/8595ca60a5c8d31bbe37

コルーチンの導入

コルーチンはまだKotlin自体には組み込まれていないので、利用する場合はライブラリのkotlinx-coroutines-coreを追加する必要があります。

Gradleプロジェクトの場合は、以下のように追加します。

dependencies {
    compile('org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.5')
    //...
}

WebFluxのリクエストをコルーチンで処理

例えば、リクエストでIDを受け取り、それをキーにRDBからデータを取得して、必要なデータを返すAPIをWebFluxとコルーチンで実装すると、以下のようになります。

package box.white.seriwb.api

import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.runBlocking
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.ServerResponse.ok
import reactor.core.publisher.Mono

@Component
class KotlinApiHandler {

    // ----- コルーチンのサンプル -----
    @Autowired
    lateinit var repository: KotlinApiRepository

    fun coroutineSelect(req: ServerRequest): Mono<ServerResponse> = runBlocking {

        val id = req.pathVariable("id").toLong()

        val responseData = async {
            repository.getSimpleResponseData(id)
        }

        ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .syncBody("{\"result\":\"${responseData.await().envelope}\"}")
    }
}

HandlerのcoroutineSelect関数は、JDBC処理(ブロッキング処理)がある以上、どこかで当該処理との待ち合わせ(ブロッキング)を行わないといけないのですが、 一方でこの関数全体がノンブロッキングである必要があります。 このため、Handlerの処理全体をコルーチンの処理として扱い、呼び出し元から見れば一つの処理となるようにしてあります。

具体的には、runBlockingで囲まれた部分がコルーチンで動作するので、

  1. Handler全体をrunBlockingで囲い、
  2. ブロッキング処理となるJDBCアクセスの処理は、asyncブロック内で呼び出し、
  3. その結果を利用するときにawaitメソッドを使うようにします。

今回はasyncが1つですが、2つ以上の場合は、asyncブロック内の処理結果が実際に必要になるところでawaitをまとめて呼び出すと、asyncの処理が並列で実行されます。

次はJDBC側の処理を見てみましょう。

Suspending関数を使う関数もSuspendingに

asyncブロック内で呼び出していたJDBC処理のgetSimpleResponseData関数は、Suspending関数とするため、suspend修飾子を付与して宣言します。

package box.white.seriwb.api

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository
import org.springframework.transaction.annotation.Transactional

@Repository
class KotlinApiRepository {

    @Autowired
    lateinit var dao: KotlinApiDao

    @Transactional
    suspend fun getSimpleResponseData(id: Long): SimpleResponseData {
        val sample = dao.findSample(id)

        return SimpleResponseData(sample.value)
    }
}

data class SimpleResponseData(
    val envelope: String
)

どうしてSuspendingにするかというと、この処理は実際のJDBC処理となるfindSample関数を呼び出しているのですが、 このfindSample関数がSuspending関数だからです。 更に言うとSuspending関数を呼び出すのはSuspending関数内か、asyncブロック内である必要があるからです。

しかしSuspending関数にしたとはいえ、見て貰うとわかるように、 実際のJDBC処理となるDAOのfindSample関数を呼び出しているのにも関わらず、 suspend修飾子が付いている以外は普通の関数呼び出しの記述になっています。

つまりDAOを使うRepositoryの関数内では、Suspending関数も通常の関数呼び出しのように書けるのです。

ブロッキング処理はSuspending関数で

ではブロッキングが必要となる肝心要のDAOである、findSample関数はどうなるかというと、 こちらもsuspend修飾子を付けてSuspending関数にするだけで対応が終わります。

package box.white.seriwb.api

import box.white.seriwb.api.jooq.public_.Tables.SAMPLE
import box.white.seriwb.api.jooq.public_.tables.records.SampleRecord
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Repository

@Repository
class KotlinApiDao {

    @Autowired
    lateinit var create: DSLContext

    suspend fun findSample(id: Long): SampleRecord {
        val result = create
                .selectFrom(SAMPLE)
                .where(SAMPLE.ID.eq(id))
                .fetch()
        return result.first()
    }
}

どうでしょうか、これであればKotlinのスッキリさを残したまま、WebFluxを利用できる気がしないでしょうか。

そしてコルーチンで利用する関数はSuspending関数として定義しているので、 通常の呼び出し方ができないことをコードとしても制約を課しているので、誤って呼び出されることを防止しています。

なんて便利なんでしょうか・・・

コルーチンとWebFluxでどうなったか

さて、これまでの話でコルーチンをWebFluxで使うことの利点を感じてもらえたかと思いますが、個人的に利点を上げると

ここらへんが大きなところかと思っています。

このようにコルーチンを利用するとWebFluxの勘所であるブロッキング処理を(そこそこ)簡単に扱えるので、 ここがWebFlux導入の障害となっていたのであれば、Kotlinとコルーチンを利用してみてはどうでしょうか?