ベータコンピューティングの活動や技術、開発のこだわりなどを紹介するブログです。



RxJavaとKotlin CoroutinesでAndroidのBLE制御をした話

はじめまして、新入社員のd-ariakeと申します。
主にAndroidアプリの開発を担当しています。
宜しくお願い致します。

今回扱う内容

私の初めてのお仕事は業務用Androidアプリの開発案件でした。
ものすごく簡単に言うと、組み込み機器とAndroid端末をOOBでペアリング・接続し、その機器から送られてくる商品のデータをAndroidアプリで受信・管理するといった内容です。

Bluetooth通信を行っているので、当然処理は非同期で行う必要が出てきます。
標準のBLEライブラリも非同期コールバックで書くようになっているのですが、なかなか取り扱いづらいです。
(触ったことがある人にはわかってもらえるかと思います。)

そのため、私は RxJavaKotlin Coroutines を使って非同期な処理を実装することにしました。
その結果、より直感的で分かりやすい実装を行うことができました。
今回はその知見を共有したいと思います。

RxJava2・RxAndroidBleの導入

AndroidのBluetoothまわりのAPIはとても扱いづらいです。
普通に書いているだけでコールバック地獄になり、とてもじゃないですが、メンテナンスできない状態になってしまいます。
そこで、本プロジェクトでは RxAndroidBle というライブラリを導入することにしました。
このライブラリはBLEの接続やread/wriet処理などをRxJavaの Observable<T>Single<T> のストリームとして提供してくれます。

以下、ライブラリの README.md からの引用です。

BLEのスキャン処理の例:

Disposable scanSubscription = rxBleClient.scanBleDevices(
        new ScanSettings.Builder()
            // .setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY) // change if needed
            // .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES) // change if needed
            .build()
        // add filters if needed
)
    .subscribe(
        scanResult -> {
            // Process scan result here.
        },
        throwable -> {
            // Handle an error here.
        }
    );

// When done, just dispose.
scanSubscription.dispose();

BLEのwrite処理の例:

device.establishConnection(false)
    .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristicUUID, bytesToWrite))
    .subscribe(
        characteristicValue -> {
            // Characteristic value confirmed.
        },
        throwable -> {
            // Handle an error here.
        }
    );

flatMapSingle()Disposablesubscirbe() など、Rxらしさのあるコードになっていますね。

本プロジェクトでも、タイムアウトやリトライ処理などをRxのオペレータを使って記述しています。
例えば、BLEのread/write処理に対して、遅延付きリトライを行う (疑似) オペレータは以下のようになります。

//  ワンショット系のBLE処理をリトライ付きで実行する。
//  リトライ時には遅延を掛けるようにする。
//  ただし、BLE接続が切断された場合はこれ以上リトライしない。
fun Single<ByteArray>.retryBleOperation(
    retryTimes: Long,
    delay: Long,
    unit: TimeUnit,
    scheduler: Scheduler = Schedulers.computation(),
    onRetry: (Throwable) -> Unit = {}
): Single<ByteArray> = this.retryWhen { errorStream: Flowable<Throwable> ->
    errorStream.zipWith(IntRange(0, Int.MAX_VALUE).asIterable()) { error, index ->
        //  指定回数までリトライを掛ける。
        //  ただし切断エラーの場合はどうあがいても絶対にコマンドは失敗するため、リトライを中止する。
        if (index < retryTimes && error !is BleDisconnectedException) error
        else throw error
    }.flatMap { error -> Flowable.timer(delay, unit, scheduler).doOnNext { onRetry(error) } }
}

Rxを使わずにリトライのような処理を実装しようとすると、コールバックのネストになったり、一時的な状態を保持するためのフィールドが乱立したりしてしまうと思います。
また、時間が絡んでくるような処理はテストが非常にやりづらいです。

そこで、上記のようにRxを使って実装することで、(呼び出し元に時間の管理やリトライ回数の処理などを意識させる必要のない) 非常にスッキリとしたコードにすることができたと思います。
さらに TestScheduler を使用することによって、時間を扱うような処理にも関わらず、簡単に単体テストを書くことができます。

以下がそのテストコードの例となります。

    @Test
    fun BLE接続が切断されていればリトライせずにエラーを通知する() {
        val testScheduler = TestScheduler()

        var counter = 0
        val source: Single<ByteArray> = Single.create { emitter ->
            when (++counter) {
                
                //  1回目・2回目の発火時は通常のエラー
                1, 2 -> emitter.onError(Exception())
                
                //  3回目の発火時は切断エラー
                3 -> emitter.onError(mockk<BleDisconnectedException>(relaxed = true))
                
                else -> emitter.onSuccess(byteArrayOf())
            }
        }
        val testObserver = source.retryBleOperation(
            retryTimes = 5L,
            delay = 100L,
            unit = TimeUnit.MILLISECONDS,
            scheduler = testScheduler
        ).test()

        //  1回目の発火
        //  はじめは失敗しているが、まだエラーは通知されていない。
        testObserver.assertNoValues()
        testObserver.assertNoErrors()

        //  2回目の発火
        //  最初のリトライを掛けて失敗する。
        testScheduler.advanceTimeBy(100L, TimeUnit.MILLISECONDS)
        testScheduler.triggerActions()
        testObserver.assertNoValues()
        testObserver.assertNoErrors()

        //  3回目の発火
        //  再びリトライを掛けて失敗する。
        //  BleDisconnectedExceptionによって失敗するため、これ以上リトライは行わない。
        testScheduler.advanceTimeBy(100L, TimeUnit.MILLISECONDS)
        testScheduler.triggerActions()
        testObserver.assertNoValues()

        //  切断エラーによってストリームが異常終了しているはず。
        testObserver.assertError(BleDisconnectedException::class.java)
    }

Rxの経験が十分にあり、AndroidでBLEを扱う機会がある際には、ぜひ導入を検討してみることをおすすめいたします。
(Rxの購読やHot/Coldに関する知識がないとちょっと厳しいかもしれません。)

kotlinx-coroutines-rx2の導入

本プロジェクトの非同期処理では全般的にKotlin Coroutinesを利用しています。
非同期の部分をsuspend関数にすることによって、非同期処理を同期処理のような呼び出しで扱うことができます。
特に非同期処理を連続して呼び出す際に、通常ならばコールバックのネストが必要な部分を通常の同期処理のような呼び出しとして記述できるため、とても直感的で分かりやすいコードにすることができます。

通常のコールバック形式で記述した際はこのようなコードになりますが、

//  通常のコールバック形式でのコード
fun doSomethingAsync() {
    doAsync1(
        onSuccess =  { result1 ->
            doAsync2(
                result1,
                onSuccess = { /* 処理1と処理2が完了した際の処理 */ },
                onFailure = { /* 処理2のエラー処理 */ }
            )
        },
        onFailure = { /* 処理1のエラー処理 */ }
    )
}

Kotlin Coroutinesを使うことにより、このような同期処理のようなコードになります。

//  Kotlin Coroutinesを利用したときのコード
suspend fun doSomethingAsync() {

    //  try-catchを使用した例
    val result1 = try {
        doAsync1()
    } catch (e: SomethingThrowable) {
        //  処理1のエラー処理
    }

    val result2 = try {
        doAsync2(result1)
    } catch (e: SomethingThrowable) {
        //  処理2のエラー処理
    }


    //  runCatchingを使用した例
    runCatching {
        doAsync1()
    }.onFailure {
        //  処理1のエラー処理
    }.mapCatching { result1 ->
        doAsync2(result1)
    }.onFailure {
        //  処理2のエラー処理
    }
}

本プロジェクトで、BLEの制御に関しては非同期をRxで扱っていましたが、ワンショットなストリームに関してはsuspend関数への変換を行い、同期処理的な書き方をできるようにしました。

そのために kotlinx-coroutines-rx2 という公式のサポートライブラリを使います。
このライブラリではRx (Observable<T>, Single<T>, ...) とKotlin Coroutines (suspend関数, Flow<T>) の相互変換用の関数を提供してくれています。

とあるBLEのwrite処理をsuspend関数化すると以下のようになります。

//  商品データの送信をリクエストする。
//  (といったような処理が実際に扱った機器にはありました。)
//  内部でタイムアウトとリトライの処理を掛ける。
suspend fun RxBleConnection.request(): ByteArray {
    val command = byteArrayOf(/* ... */)
    val uuid = UUID.fromString("12345678-1234-1234-1234-1234567890AB")

    return this.writeCharacteristic(uuid, command)
        .timeout(REQUEST_COMMAND_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS)
        .retryBleOperation(
            retryTimes = REQUEST_COMMAND_RETRY_TIMES,
            delay = REQUEST_COMMAND_RETRY_DELAY_IN_MS,
            unit = TimeUnit.MILLISECONDS,
        )
        .await()
}

このようなsuspend関数を作っておき、処理の呼び出し側で通常の同期処理のように呼び出すだけです。
エラーハンドリングも普通にtry-catch (runCatching) で行うことができます。

private suspend fun executeRequestCommand() {
    val handle = this.connectionHandle ?: return

    //  BLEのコマンドを実行して結果を待機する。
    val isSuccess = runCatching { 
        handle.request()
    }.isSuccess

    if (isSuccess) { /* 成功時の処理 */ }
    else { /* 失敗時の処理 */ }
}

このようにRxのオペレータでリトライとタイムアウトの処理を行い、待機をKotlin Coroutinesで行うという連携ができました。
RxもKotlin Coroutinesも非常に便利ですね。
AndroidでBLEを扱うことがあれば、ぜひ参考にしてみてください。