Analysis Platform 部の松﨑です。 これはキャディ株式会社のアドベントカレンダー 25 日目の記事です。
CADDi では、機械学習ワークロードにおける複雑な後処理(Pythonスクリプト等)の実行基盤として、ワークフローエンジンの Kestra を採用しました。 様々なエンジンと比較検討した結果、我々のユースケースにとってベストな選択だったと思います。
しかし、実際に開発を始めてみると「開発元である Kestra 社の思想」と「我々がやりたい運用」の間に、少しだけ溝があることに気づきました。今回は、我々がその溝をどうやって埋め、Kestra と付き合っていっているかをご紹介します。
方向性の違いを最も感じたのは、Flow(ワークフロー定義)や File の登録に関するインターフェースです。Kestra は非常に優れた UI を持っており、UI 上で YAML を意識せずに Flow を定義することができます。この利点は生かさない手はなく、API 設計もこの UI 操作に最適化されている印象を受けます。 一方で、我々開発チームには「可能な限りすべてをコードベースで管理したい」という思いがあります。これを具体的に書き表すと
- GitHub 上でコードを管理し、変更履歴を残したい
- Pull Request ベースでレビューを行いたい
- マージされたら CI/CD で自動デプロイしたい
ということになります。このアプローチをとろうとした時、Kestra 標準の API インターフェースでは一工夫が必要でした。
Kestra では一つの処理単位を Flow と呼び、YAML で記述します。 我々としては、この YAML ファイルを Git リポジトリで管理し、CI から API 経由で登録したいと考えました。しかし、Kestra の API は UI での操作を意識してか「1つの Flow を個別に登録する」ことが推奨されているような設計になっています。一括登録も用意されてはいますが、「複数の Flow 定義が 1 つのファイルにマージされている状態」を受け取るものでした。
リポジトリ上では可読性を高めるために「1 Flow = 1 ファイル」で管理したいのですが、API は「まとまった 1 つのファイル」であることを求められてしまっている状態です。地味なようですが、開発体験上悩ましい課題でした。そこで我々は、Kestra の仕様に合わせつつ、自分たちの開発体験も損なわないためのデプロイスクリプトを自作することにしました。
アプローチは以下の通りです。
- YAML ファイルは 1 Flow = 1 ファイル で作成し、それぞれレビューを行う。
- コミット時に Lint をかけ、構文エラーを防ぐ。
- デプロイ時にスクリプトで全 YAML を「Kestra が好む 1 つのファイル形式」に結合する。
- 結合したデータを Kestra の Bulk API で登録する。削除オプションもつけ、不要な Flow の削除も可能にする。
これにより、開発者は Kestra の内部仕様や API を意識することなく、普段通りの Git フローで開発を進めることができるようになりました。実際に CI/CD で動かしている Python スクリプトのロジックは、概ね以下のような形になっています。 (詳細は隠蔽していますが、処理の流れのイメージです)
def main(): # 1. コマンドライン引数の取得 # (対象ディレクトリ, Kestra URL, 対象Namespace, 削除フラグ, ファイル同期フラグなど) config = parse_arguments() # 2. フローの同期を実行 sync_flows(config) def sync_flows(config): """ フロー定義の同期処理 """ # 指定ディレクトリから対象となる名前空間(ディレクトリ)のリストを取得 target_namespaces = get_namespace_list(config.flows_dir, config.target_namespace) logger.info(f"{len(target_namespaces)} 個の名前空間についてフローを同期します") for namespace in target_namespaces: # A. その名前空間内のYAMLファイルを全て収集 yaml_files = collect_yaml_files(namespace) # B. 複数のYAMLを1つのリクエスト用にマージ (--- 区切りなど) merged_flow_content = merge_yamls(yaml_files) # C. Kestra API へ送信 (PUT/POST) # delete_flow=True の場合、ここに含まれない既存フローは削除される api_client.push_flows( namespace=namespace, content=merged_flow_content, delete_others=config.delete_flow ) if __name__ == "__main__": main()
このスクリプトを CI に組み込むことで、開発者は GitHub に Push するだけで Kestra 環境が同期される開発環境を実現できました。
昨今のソフトウェア開発において OSS の活用は不可欠ですが、ツールの思想と自分たちのやりたいことが 100% 一致することは稀です。もちろん、本家に Pull Request を送って改善できればベストですが、それが反映されるのを待っているとビジネスが止まってしまうこともあります。 そんな時は、「使いにくい」と諦めるのではなく、今回のように「自分たちの理想との差分をエンジニアリングで埋める」というアプローチも、一つの解だと思っています。
Kestra はクセもありますが、我々には必要不可欠な技術です。この記事が、Kestra や他のツール導入で悩んでいる方のヒントになれば幸いです。