KEDA を使って Pub/Sub メッセージ数に応じたスケーリングを行う

本記事はキャディ株式会社のアドベントカレンダーに寄稿しています。

こんにちは。キャディ株式会社の Analysis Platform Group で MLOps エンジニアを務めている廣岡です。普段はキャディの図面活用クラウドサービス CADDi Drawer のエンジニアとして、図面に対する機械学習解析のインフラやバックエンドの開発を行っています。

今回は KEDA を使って、Google Cloud Pub/Sub のメッセージ数に応じて Kubernetes ワークロードのスケーリングを行う方法を紹介します。

背景

Drawer では図面データの解析や格納など、サービスの様々な箇所で非同期処理が使われています。特に機械学習による図面解析は時間がかかるものが多いため、適したパフォーマンスで解析を実行するために一部で非同期実行を扱っています。

Google Cloud で非同期処理を実現するマネージドサービスの一つとしては、Cloud Pub/Sub があります。Pub/Sub を使うことでメッセージの送信側と受信・処理側を分離することができ、図面のアップロードに応じた非同期の機械学習解析の実行などにも利用できます。

非同期処理を運用する上では、動的に変化するメッセージ数に応じて適切に処理をスケーリングさせる必要があります。そこで今回は Pub/Sub などのイベントに応じたスケーリングを管理するツールとして KEDA および使い方のサンプルを紹介します。

KEDA

KEDA (Kubernetes Event-driven Autoscaling) は Kubernetes ワークロードに対してイベント駆動型のスケーリングを可能にするオープンソースツールです。 今回紹介する Google Cloud Pub/Sub だけでなく、Azure, AWS などの代表的なパブリッククラウドや、Apache Kafka, PostgreSQL などさまざまなイベントソースに対応しています。また KEDA のスケーリングは k8s の HPA (Horizontal Pod Autoscaling) を拡張する形で動作するため追加で必要な知識が少なく済むことや、対象の pod を最小ゼロに設定できるなどのメリットもあります。

Drawer ではいくつかの非同期処理のスケーリングにすでに KEDA を導入しており、図面に基づく処理の効率的な実行に貢献しています。

KEDA によるスケーリングの制御

Pub/Sub ベースの KEDA によるスケーリングは、Google Cloud からすでにドキュメントとサンプルが公開されています。ただしいくつかつまづいた点があったので、こちらを踏襲・補足しながら導入を試してみます。実行の際は適宜課金や各種 API が有効な Google Cloud プロジェクトを用意してください。

cloud.google.com

github.com

セットアップ

クラスタ構築とクラスタへの KEDA のインストールはドキュメント通りに実行できます。サンプルでは GKE 用 Workload Identity Federation を有効にしており、これによってサービスアカウントキーを発行せずに IAM による権限付与と利用ができます。私が実行した際の各環境は以下でした。

  • GKE クラスタバージョン: 1.30.5-gke.1443001
  • keda (helm chart): 2.16.0

クラスタに KEDA をインストールすると、keda-operator という pod および k8s サービスアカウントが作成されます。これによって外部のイベントに基づいた Deployments のスケール管理が可能になります。

サンプルでは Workload Identity 連携を通じて直接 keda-operator の k8s サービスアカウントに権限付与していますが、私の場合はこれだと今後の Pub/Sub メッセージ数のスケーリング(おそらくメトリクスの参照部分)がうまくいかなかったので、Google Cloud サービスアカウントと k8s サービスアカウントを紐つける形で権限を付与します。これによって、keda-operator が Pub/Sub メッセージ数を含めた Google Cloud のメトリクスを参照できるようになります。

# サンプルの↓だとうまくいかなかった
# gcloud projects add-iam-policy-binding projects/${PROJECT_ID} \
#      --role roles/monitoring.viewer \
#      --member=principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${PROJECT_ID}.svc.id.goog/subject/ns/keda/sa/keda-operator

# KSA に紐つける GSA を作り権限付与する
gcloud iam service-accounts create keda-operator \
--project=${PROJECT_ID}

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member "serviceAccount:keda-operator@${PROJECT_ID}.iam.gserviceaccount.com" \
--role "roles/monitoring.viewer"

gcloud iam service-accounts add-iam-policy-binding keda-operator@${PROJECT_ID}.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:${PROJECT_ID}.svc.id.goog[keda/keda-operator]"

kubectl annotate serviceaccount keda-operator \
  --namespace keda \
  iam.gke.io/gcp-service-account=keda-operator@${PROJECT_ID}.iam.gserviceaccount.com

ワークロードとスケーリングの構成

Pub/Sub ワークロードのデプロイ

Pub/Sub トピックとサブスクリプション、ワークロードのサービスアカウントへの権限付与はドキュメント通りに実行します。マニフェストの適用については、サンプルに記載されている Docker イメージのプルに失敗するようなので、別途メッセージをサブスクライブするだけのコンテナイメージを作成しておきます。

# main.py
from google.cloud import pubsub_v1
import os
import logging

logger = logging.getLogger(name=__name__)

PROJECT_ID = os.getenv('PROJECT_ID')
SUBSCRIPTION_ID = os.getenv('SUBSCRIPTION_ID')

def callback(message):
    logger.info(f"Received message: {message.data}")
    message.ack()

def main():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    logger.info(f"Listening for messages on {subscription_path}...")

    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()

if __name__ == "__main__":
    main()
# requirements.txt
google-cloud-pubsub==2.26.1
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY main.py /app
COPY requirements.txt /app

RUN pip install --no-cache-dir -r requirements.txt

CMD ["python", "main.py"]

コンテナイメージのビルドとプッシュ。コンテナレジストリは適宜作成してください。

docker build --platform linux/amd64 -t asia-northeast1-docker.pkg.dev/${PROJECT_ID}/<repository>/keda-pubsub-sample:latest .                 
gcloud auth configure-docker asia-northeast1-docker.pkg.dev
docker push asia-northeast1-docker.pkg.dev/${PROJECT_ID}/<repository>/keda-pubsub-sample:latest

マニフェストを適用することで、ワークロードをデプロイします。Google Cloud のプロジェクト ID や Docker イメージの指定は適宜修正してください。

apiVersion: v1
kind: Namespace
metadata:
  name: keda-pubsub
---
apiVersion: v1
kind: ServiceAccount
metadata:
  namespace: keda-pubsub
  name: keda-pubsub-sa
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: keda-pubsub
  namespace: keda-pubsub
spec:
  selector:
    matchLabels:
      app: keda-pubsub
  template:
    metadata:
      labels:
        app: keda-pubsub
    spec:
      serviceAccountName: keda-pubsub-sa
      containers:
      - name: subscriber
        image: asia-northeast1-docker.pkg.dev/<PROJECT_ID>/<repository>/keda-pubsub-sample:latest
        env:
        - name: PROJECT_ID
          value: <PROJECT_ID>
        - name: SUBSCRIPTION_ID
          value: "keda-echo-read"
 ---

スケーリングの設定

KEDA のScaledObjectをデプロイすることで、Pub/Sub メッセージ数に基づいたスケーリングを構成します。下記マニフェストは Google Cloud のサンプルを元に、最小レプリカ数や Pub/Sub に関するスケーリング設定を明示的に記載しています。

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-auth
  namespace: keda-pubsub
spec:
  podIdentity:
    provider: gcp
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: keda-pubsub
  namespace: keda-pubsub
spec:
  pollingInterval: 5
  cooldownPeriod:  10
  minReplicaCount: 0
  maxReplicaCount: 5
  scaleTargetRef:
    name: keda-pubsub
  triggers:
  - type: gcp-pubsub
    authenticationRef:
      name: keda-auth
    metadata:
      mode: "SubscriptionSize"
      value: "3"
      activationValue: "0"
      subscriptionName: "keda-echo-read"
---

ちなみにtype:gcp-pubsubでなく、type:gcp-stackdriverとして下記のように設定しても同様にスケーリングを構成できます。この設定の場合は Pub/Sub 以外にも多くのメトリクスに基づいたスケーリングができます。

...
  - type: gcp-stackdriver # also OK
    metadata:
      projectId: <PROJECT_ID>
      filter: 'metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages" AND resource.type="pubsub_subscription" AND resource.label."subscription_id"="keda-echo-read"'
      targetValue: "3"

スケーリングを構成すると、自動的に Deployment に対する HPA が構成されます。最初にスケーリングを構成する際は、HPA のステータスが✅になるまでに数分程度時間がかかるようなので少し待ちましょう。

スケーリングを構成した状態でkubectl get hpa,scaledobject -n keda-pubsubコマンドを実行すると、作成された HPA と ScaledObject リソースを確認できます。Pub/Sub メッセージがない間は ScaledObject によって Pod のレプリカ数がゼロになります。

サンプルリクエストによる動作確認

試しに Pub/Sub にメッセージを送信して、スケーリングの振る舞いを確認してみます。

for num in {1..20}
do
  gcloud pubsub topics publish keda-echo --project=${PROJECT_ID} --message="Test"
done

watch "kubectl get pod -n keda-pubsub"コマンドを実行すると、Pod の数が増減する様子を確認できます。また Google Cloud コンソールの Deployment のイベントからも、スケーリングが制御されていることを確認できます。

KEDA によって Pod の数が増減している様子

おまけ

Google Cloud のサンプルでは、KEDA-HTTP アドオンを使った LLM のスケーリングも紹介されています。HTTP アドオンを使うことで、Cloud Run のようにリクエストをバッファリングすることができ、最小レプリカ数を0としたスケーリングができます。

このサンプルも概ねドキュメント通りの手順で動作しますが、otwld/ollama-helm の仕様に合わせて下記フォーマットでの記載が必要でした。試す際はご注意ください。

# helm-values-ollama.yaml
ollama:
  ...
  models:
    pull: # この行の追加が必要
      - gemma:7b
      - llama2:7b

HTTP リクエストに関しては Cloud Run でも似たようなスケーリングができますが、2024/12 の時点では Cloud Run での GPU 利用は限定的なリージョンでのみプレビュー提供されており、日本リージョンで一般提供されるのはまだ先のようです。

KEDA-HTTP アドオンを用いることで GPU ワークロードのゼロスケーリングが可能になるので、こういった用途が必要な際は検討するのも良いかもしれません。

クリーンアップ

検証が終わったら不要なリソースは消しましょう。特に LLM のサンプルで利用する GPU ノードプールは高価になりやすいため、不要なコストが発生しないように注意してください。

まとめ

この記事では K8s ワークロードのスケーリングに KEDA を導入するサンプルを検証しました。導入時にはドキュメントなどを通じて仕様を理解する必要がありますが、スケーリングに特化したツールのため比較的キャッチアップしやすいようには感じます。イベント駆動の柔軟なスケーリングを考える際にはぜひ検討してみてください。

最後に、キャディ株式会社では「製造業AIデータプラットフォーム」という構想へ向けて、ソフトウェア開発に取り組む仲間を募集しています。もちろん機械学習開発チームも絶賛募集中ですので、ご興味がある方はお気軽にご連絡ください。

www.youtube.com