Kubeflow Pipelines の local 実行で開発効率を上げる

はじめに

AI Team MLOps エンジニアの西原です。2024 年 1 月にローカル環境で Kubeflow Pipelines を実行するドキュメントが公式から公開されました。今回はそのドキュメントを参考にローカル環境で Kubeflow Pipelines を実行する方法を紹介します。

Kubeflow Pipelines とは

今回取り扱う Kubeflow Pipelines とは何か?公式のドキュメントを引用します。

Kubeflow Pipelines(kfp)は、コンテナイメージを使ってポータブルでスケーラブルな機械学習(ML)ワークフローを構築し、デプロイするためのプラットフォームです。

CADDi AI Team では Google Cloud のマネージドなプラットフォームである Vertex AI Pipelines を使って機械学習パイプライン開発をしています。この裏で kfp が動いており、開発時に kfp の Python SDK を使ってパイプラインを定義しています。

kfp を使った開発の課題

機械学習用のコンテナイメージは比較的大きく、私たちのチームでは 1 つあたり 10~20GB になることが多いです。イメージサイズが大きくなる要因は GPU 環境でプログラムを動かすために必要なソフトウェアを setup するためです。これらの大きなコンテナイメージを push してリモートのパイプライン上で動作確認すると、Node の起動やコンテナの push と pull による待ち時間が長くなります。私たちのチームでは一番最初のコンポーネントが実行されるまでに 20 分弱かかることもありました。こういった状況では試行錯誤の回数が下がり開発効率が悪くなるため、コンテナイメージを 不必要に push せずにローカル環境で動作確認したいという話がありました。

この課題を解決するために、kfp の Python SDK を使ってローカル環境でパイプラインを実行する方法を調査し、検証したので紹介します。

kfp を手元の開発環境で実行する

ローカル環境でコンポーネント実行

サンプルコードを使ってローカル環境でコンポーネント実行する方法を紹介します。シンプルな足し算の例が次のコードになります。local.initがない状態だと実行できずエラーでプログラムが終わりますが、これを記述することでローカル環境で実行できます。

from kfp import local
from kfp import dsl

# 関数定義の後に実行しても良い
# 実行にはdockerが必要
local.init(runner=local.DockerRunner())

@dsl.component
def add(a: int, b: int) -> int:
    return a + b

task = add(a=1, b=2)
assert task.output == 3

このプログラムを実行するとログから入力と出力が確認でき、問題なく動作していることがわかります。

...省略
    {
        "inputs": {
            "parameterValues": {
                "a": 1,
                "b": 2
            }
        },
        "outputs": {
            "parameters": {
                "Output": {
                    "outputFile": "~/<PATH>/local_outputs/add-2024-01-15-18-45-51-383673/add/Output"
                }
            },
            "outputFile": "~/<PATH>/local_outputs/add-2024-01-15-18-45-51-383673/add/executor_output.json"
        }
    }
    [KFP Executor 2024-01-15 18:45:55,665 INFO]: Wrote executor output file to ~/<PATH>/local_outputs/add-2024-01-15-18-45-51-383673/add/executor_output.json.
18:45:55.877 - INFO - Task 'add' finished with status SUCCESS
18:45:55.878 - INFO - Task 'add' outputs:
    Output: 3

アーティファクトを出力

kfp にはアーティファクトというものがあります。詳しい説明はここでは省略しますが、パイプラインと紐づくもので、データセットやモデルなどがそれになります。kfp でアーティファクトの扱いはコアな部分になるため、サンプルコードで動作を確認します。足し算の結果をアーティファクトとしてファイル出力する例を次に示します。with句でファイルを開いて、書き込みと読み込みをするプログラムです。

from kfp import local
from kfp import dsl
from kfp.dsl import Output, Artifact
import json

local.init(runner=local.DockerRunner())

@dsl.component
def add(a: int, b: int, out_artifact: Output[Artifact]):
    import json

    result = json.dumps(a + b)

    with open(out_artifact.path, 'w') as f:
        f.write(result)

    out_artifact.metadata['operation'] = 'addition'


task = add(a=1, b=2)
with open(task.outputs['out_artifact'].path) as f:
    contents = f.read()
assert json.loads(contents) == 3
assert task.outputs['out_artifact'].metadata['operation'] == 'addition'

実行した際のログからアーティファクトの出力先が確認できます。

..
    [KFP Executor 2024-01-15 20:38:32,771 INFO]: Wrote executor output file to ~/<PATH>/local_outputs/add-2024-01-15-20-38-28-731045/add/executor_output.json.

      __import__(pkg_name)
20:38:32.975 - INFO - Task 'add' finished with status SUCCESS
20:38:32.975 - INFO - Task 'add' outputs:
    out_artifact: Artifact( name='out_artifact',
                            uri='~/<PATH>/local_outputs/add-2024-01-15-20-38-28-731045/add/out_artifact',
                            metadata={'operation': 'addition'} )

出力先のファイルを確認すると、json 形式で 3 が書き込まれていることが確認できます。

任意のコンテナイメージを使ったコンポーネント

ここまで Python の関数としてコンポーネントを実行してきましたが、dsl.ContainerSpecを使うと任意のコンテナイメージをコンポーネントとして実行できます。Hello Worldの文字列をファイルに書き込む例が次になります。

from kfp import dsl, local

local.init(runner=local.DockerRunner())

@dsl.container_component
def say_hello(name: str, greeting: dsl.OutputPath(str)):
    """Log a greeting and return it as an output."""

    return dsl.ContainerSpec(
        image="alpine",
        command=[
            "sh",
            "-c",
            """RESPONSE="Hello, $0!"\
                            && echo $RESPONSE\
                            && mkdir -p $(dirname $1)\
                            && echo $RESPONSE > $1
                            """,
        ],
        args=[name, greeting],
    )


task = say_hello(name="World")
print(task.outputs)

上記のプログラムを実行すると次のようなログが出力され、Hello World という文字列が見えます。実際に出力されたファイルを確認すると、Hello World という文字列が書き込まれていることが確認できます。

   Found image 'alpine:latest'

    Hello, World!
06:39:37.953 - INFO - Task 'say-hello' finished with status SUCCESS
06:39:37.953 - INFO - Task 'say-hello' outputs:
    greeting: 'Hello, World!
'


{'greeting': 'Hello, World!\n'}

GPU を使ったコンポーネント

機械学習では GPU を使って学習や推論を行うことがあります。先述した通り、GPU を使ってプログラム実行するには依存するソフトウェアが増え、コンテナイメージのサイズが大きくなります。大きなコンテナイメージを使ってリモート環境で動作確認すると待機時間が長くなります。GPU を使ったコンポーネントがローカル環境で実行できると不必要にリモートのパイプライン上で動作確認することがなくなり、待機時間を減らすことができます。これにより開発効率が大きく改善できるため、今回のローカル環境の検証の核となる部分です。

結論として、今回紹介している kfp local で GPU を使ったコンポーネントをローカル環境で実行できます。GPU を使ったサンプルのプログラムが次になります。次のプログラムは、CUDA(GPU)がない環境で実行すると失敗しますが、CUDA がある環境では成功するようになっています。

from kfp import dsl, local

local.init(runner=local.DockerRunner())


@dsl.container_component
def gpu_processing():
    return dsl.ContainerSpec(
        image="gcr.io/google_containers/cuda-vector-add:v0.1",
    )


task = gpu_processing()
print(task.outputs)

上記のコンポーネントを実行した結果が次になります。ログから GPU を使ったコンポーネントが問題なく実行できていることが確認できます。

16:21:16.599 - INFO - Executing task 'gpu-processing'
16:21:16.600 - INFO - Streamed logs:

    Found image 'gcr.io/google_containers/cuda-vector-add:v0.1'

    [Vector addition of 50000 elements]
    Copy input data from the host memory to the CUDA device
    CUDA kernel launch with 196 blocks of 256 threads
    Copy output data from the CUDA device to the host memory
    Test PASSED
    Done
16:21:18.040 - INFO - Task 'gpu-processing' finished with status SUCCESS
16:21:18.040 - INFO - Task 'gpu-processing' has no outputs

pipeline 実行

これまでコンポーネントの実行について紹介してきましたが、パイプライン実行についても紹介します。

pipeline とは何か?

kfp におけるパイプラインとは何か?公式のドキュメントを引用します。

パイプラインとは、1 つまたは複数のコンポーネントを組み合わせて計算有向 非循環グラフ(DAG)を形成するワークフローの定義です。実行時、各コンポーネント実行は 1 つのコンテナ実行に対応し、コンテナは ML のアーティファクトを作成します。パイプラインは制御フローを含むことがあります。

pipeline 実行

ローカル環境でのパイプライン実行を実際にやってみます。対象の関数に@dsl.pipelineをつけることでパイプラインとして定義できます。下記はコンポーネントを組み合わせて三平方の定理を計算するパイプラインの例です。

from kfp import dsl, local

local.init(runner=local.DockerRunner())

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    sum_task = add(x=a_sq_task.output, y=b_sq_task.output)
    return square_root(x=sum_task.output).output

result = pythagorean(a=3.0, b=4.0)
print(result)

これを実行すると次のようなログが出力され、ローカル環境だとパイプラインの実行はサポートされていないことが分かります。 (追記:v2.7.0でパイプライン実行がサポートされました。)

...
    raise NotImplementedError(
NotImplementedError: Local pipeline execution is not currently supported.

ローカル環境でパイプライン実行はサポートされてませんが、コンポーネントの実行はサポートされているのでコンポーネントを組み合わせてパイプラインっぽく実行することはできます。具体的にどうするのかというと、サンプルコードの@dsl.pipelineを消して実行するだけです。

パイプライン関数のデコレータを消して実行した結果が次になります。32 + 42 の平方根は 5 なので正しく動いていることが確認できます。

...
06:59:08.912 - INFO - Task 'square-root' finished with status SUCCESS
06:59:08.912 - INFO - Task 'square-root' outputs:
    Output: 5.0

まとめ

ここまで kfp のローカル環境での実行について紹介しました。 機械学習では GPU を使ったプログラムを実行することもありますが、その場合はコンテナイメージのサイズが大きくなります。大きなコンテナイメージを使ってリモート環境で検証すると、待機時間が長くなります。今回紹介したローカル環境での実行によって、リモート環境以外で動作確認できるようになり、不必要な待機時間を減らすことができます。今回紹介した kfp local によって開発業務の待機時間を減らせるため、うまく取り入れることで開発効率の改善が期待できます。

参考