ML推論結果の後処理基盤を開発している話

こんにちは、Analysis Platformチームの上野です。キャディ株式会社のアドベントカレンダー24日目の記事です。

この記事ではAnalysis Platformチームで実施した、機械学習モデルの複雑な後処理の実行基盤の技術選定について説明します。同様の技術選定をする際の参考になると幸いです。

既存のアーキテクチャとその課題

キャディでは一部の機械学習モデルの推論処理を以下のような非同期のアーキテクチャで行なっています。一部の推論ワーカーでは他のワーカーの結果と組み合わせて後処理をする必要があり、ワーカーごとの推論結果を格納したBigQueryのテーブルを一度経由する形になっています。

  1. 推論ワーカーが処理した結果をGoogle Cloud Pub/Sub(以降Pub/Subと呼ぶ)にパブリッシュする。
  2. Pub/SubからBigQueryに結果を格納する。
  3. BigQueryに格納されたデータを後処理したものを最終的な結果として別のテーブルに格納する。

詳細は省きますが、この仕組みには以下のような課題があったため、後処理を行うシステムを別の仕組みで置き換えることになりました。

  • 最終的なデータ品質の可視化の仕組みが不十分で、他のシステムからの利用にハードルがある。
  • 複雑な後処理ロジックがSQLで記述されており、新規開発・メンテナンスが複雑で開発や運用工数が高くなる。

技術選定

上記のような課題を解決するために、大まかに以下の要件を満たす設計を行うことにしました。

  • 要件1: データ品質を監視、可視化できること。
    • ここでは特に解析結果がエラー等でどの程度欠損しているかをデータ品質という言葉で表現しています。この記事では以降でも同様の意味でデータ品質という言葉を使用します。
  • 要件2: 機械学習エンジニアが後処理を容易に開発、保守できること。
    • 現在の仕組みを置き換えるためだけではなく、将来的により複雑な後処理や解析フローを実装したくなった際に、それをサポートできるとより良いだろう、という考えもありました。
  • 要件3: 複数の解析ワーカーの出力が到着するのを待機して後処理を開始できること。

二つ目の要件である複雑な後処理や解析フローの実装にはワークフローエンジンを採用するのが適切であると考え、以下のワークフローエンジンから選定することにしました。

  • Temporal
  • Kestra
  • Google Cloud Workflows(以降Cloud Workflowsと呼ぶ)
  • Argo Workflows

ただ、Cloud WorkflowsとArgo Workflowsはそれぞれ以下の点から早い段階で検討対象から外しました。

  • Cloud Workflows: 複雑な処理では基本的に外部APIを呼び出す形になるので、後処理APIを別で用意する必要があり、追加の工数がかかる。
  • Argo Workflows: すでに他チームで運用実績があるが、運用に専門チームが必要であり運用工数が高そう。

TemporalとKestraがそれぞれ要件を満たせるかどうかを実際に少し触ってみて比較してみました。結果の概要を以下に示します。機能的にはどちらも問題なかったのですが、TemporalはKestraと比較して少し学習コストが高いかなという印象を受けました。

要件1 要件2 要件3
Temporal ⭕️ 🔺 ⭕️
Kestra ⭕️ ⭕️ ⭕️

Temporal

Temporalは三つの要件のうち二つは対応可能でしたが、一つはKestraと比べて劣るという結果になりました。

まず、「データ品質を監視、可視化できること」についてですが、これはログなどを適切に仕込むことで問題なく要件を満たすことができると考えられます。また、実行中のWorkflowに対してメッセージを送ることのできるSignal機能を使用すれば以下のように複数の解析ワーカーの出力の到着を待って処理を開始できるので、こちらの要件も満たせそうです。

まずは結果を受け取る側のワークフローを定義します。以下のようにSignal機能を用いて結果を受け取るための関数と、実際にワークフローの実行中に結果を待機する処理を書くことで結果の待機ができます。

@workflow.defn
class AnalysisWorkflow:
    ...
    # Signal機能を使って結果を受け取る
    @workflow.signal
      def analysis_data_available(self, request: AnalysisRequest) -> None:
        ...
        self._received_analysis_data[request.analysis_type] = request

    # ワークフローとして実行される処理
    @workflow.run
    async def run(self, workflow_input: AnalysisWorkflowInput) -> dict[str, Any]:
        ...
        # 必要なデータが揃うまで待機
        expected_types = set(AnalysisType.__members__.values())
        await workflow.wait_condition(
            lambda: set(self._received_analysis_data.keys()) == expected_types,
            timeout=timedelta(minutes=10),  # 10分のタイムアウト
        )
        ...

Signalを送信する側ではSignal-With-Startの機能を用いて以下のようにSignalを送信できます。ワークフローが起動している場合はSignalを送信し、起動していない場合は起動してSignalを送信できます。

await self.client.start_workflow(
    AnalysisWorkflow.run,
    AnalysisWorkflowInput(
        job_id=request.job_id, tenant_id=request.tenant_id
    ),
    id=workflow_id,
    task_queue=activity.info().task_queue,
    start_signal="analysis_data_available",
    start_signal_args=[request],
)

一方で、実装の際にはTemporal特有の概念を学ぶ必要があり、後述のKestraと比較すると学習コストが高いように感じられました。学習コストが高いと実装のサポートやレビューの工数が高くなる可能性があり、保守運用や機能開発の工数を圧迫する懸念がありました。

Kestra

Kestraは三つの要件全てに対応可能でした。

データ品質の監視・可視化はTemporalと同様に可能です。また、ワークフローの定義はYAMLで記載でき、GitHub Actions等に慣れていれば学習コストはそこまで高くなさそうです。さらに、複数ワーカーの出力の到着の待機もKV Storeを用いて以下のように擬似的に実装可能です。

まずはPub/Subから結果を受け取ってKV Storeに格納するワークフローを用意して、以下のようなステップでKV Storeに書き込みを行います。

...
  - id: set_kv
    type: io.kestra.plugin.core.kv.Set
    namespace: demo
    description: 解析結果をKVストアに保存
    key: "{{trigger.body.job_id}}.{{trigger.body.analysis_type}}"
    value: "{{trigger.body}}"
    overwrite: true
    kvType: JSON
    ttl: PT24H # 24時間の有効期限
...

次に、結果を利用するワークフローを別で用意し、そこでKV Storeから結果を読み取る形にします。 errorOnMissing: trueとすることで、KV Storeに対象のキーが存在しない場合はリトライされ、結果を一定時間待機するような挙動を実現できます。

...
  - id: get_analysis_results
    type: io.kestra.plugin.core.flow.ForEach
    values: "{{ inputs.required_analysis_types }}"
    tasks:
      - id: get_kv
        type: io.kestra.plugin.core.kv.Get
        key: "{{ trigger.body.job_id }}.{{ taskrun.value }}"
        errorOnMissing: true
        retry:
          type: exponential
          interval: PT10S
          maxInterval: PT10M
          maxDuration: PT12H
...

結論

最終的にKestraを採用することになりました。Temporalと比較すると学習コストが低く、開発や運用コストを抑えられそうな点を重視しました。結果を受け取ってKV Storeに格納する部分は実装が必要になりますが、その仕組みを我々のチームで実装することで後処理ロジックの実装の負担を増やすことなく要件を満たせそうです。

設計

余談ですが、Kestraを用いて後処理を置き換えた後のアーキテクチャは以下のようになりました。これにより、結果を取りまとめるために用いていた中間テーブルが不要になったり、Pythonで後処理ロジックを記載できるようになるなど、データやロジックの取り扱いがシンプルになります。

ちなみにKestraにはCloud版が存在しますが、開発開始時点ではAlpha版だったため、社内に存在するGKEクラスタにOSS版をセルフホストすることにしました。

さいごに

この記事ではキャディにおける機械学習モデルの後処理の現状と課題及びそれを解決するための技術選定について説明しました。この記事が同様の技術選定を行う際の参考になれば幸いです。