OpenSearchで実現する画像検索とテスト追加で目指す安定運用【イベントレポート】

image

OpenSearchで実現する画像検索とテスト追加で目指す安定運用

こんにちは、CADDi AI Lab MLEの志水です。 8/19に10X,M3の両社と検索運用の勉強会#Search_C10Xm3 を開催いたしました。 おかげさまで当日までの登録者が254名 、当日の参加者は最大137名までお越しいただき大盛況でした。 勉強会中何度か紹介されたペンギン本Amazonで売り切れる ような反響もあったようです。

その中から、キャディ発表分を抜粋したイベントレポートをいたします! - 10Xさんの発表資料はこちらから - M3さんの発表資料はこちらから

AI Labでは図面管理SaaS CADDi DRAWER の検索サービスを開発/運用してきており、その経験からOpenSearchで実現する画像検索とテスト追加で目指す安定運用についてお話ししました。

目次

image

opensearch で knn を用いた画像検索

前半では、OpenSearchでkNNを用いた画像検索がなぜ必要だったかと、それを実現した方法について説明していきます。

CADDi DRAWER

image

CADDi DRAWER は今年7月にリリースした図面管理SaaSです。 AI Labではこの中で類似画面の検索に用いられている検索エンジンを開発しています。

サービスの詳細や開発に至った経緯についてはリリース時のブログTech Lead や PDM によるイベントレポート をご覧ください。

図面って何?

image

図面は製造業において作る対象のレシピのようなものです。

図面内の情報の読み取りは、アルゴリズムOCR、オペレーションの合わせ技になっています。

特に右下の枠内のテーブル情報には、図面のkeyとなる図番や部品名などが入っており、読み取ることがかなり重要です。

図面にまつわる pain

image

なぜあえてこれを管理するSaaSを作る必要があるか、そこには図面特有のペインや辛さがあるからです。

業務都合上、縮尺違いや勝手違い(左右反転)した図面を扱うことは多く、「この形に似た図面を探したい」と思う場面が多くあります。そのため図面内の情報の検索や形状をベースにした類似が求められますが、ここにもう1つ難しさがあります。 図面は元々は設計の部署でCADなどで書かれますが、部署を跨いだり(設計→調達)会社を跨いだり(発注会社→加工会社) するときにPDFに変換されます。なので多くの情報をPDFから読み取ることも必要になります。

検索システムのアーキテクチャ

一般的な検索システム

image

ペンギン本 の説明が的をえていてとてもおすすめの本です。 こちらの17ページから引用したもので、一般的な検索システムの構成としてUIと検索エンジンの間にあるクエリプロセッサが検索のやりとりを行い、ドキュメントDBとインデクサが検索エンジンへのデータの挿入を行っていることがわかります。

DRAWER の検索システム (Kaleido)

image

DRAWERの検索システム (Kaleido) も同じような構成になっています。 UIがクエリを発行し、Pythonで書かれたクエリプリプロセッサOpenSearchが理解できる形にクエリを変換し、検索します。 検索結果をPythonのクエリポストプロセッサがフロントエンドが理解できる形に変換して返します。

データのインデックスに関しては、図面に紐づくメタデータ等は直接OpenSearchにインデックスされます。 画像特徴のベクトルは、PyTorchのモデルを通して計算され、OpenSearchにインデキシングされます。

DRAWER チームと AI Lab チームの責任分界点

image

DRAWERチーム

  • フロントエンド、バックエンド、インフラ含めた全体を開発/運用している
  • 検索システムの中ではUI、データ、インデクサの部分

AI Lab

  • AI LabはKaleidoという水色の部分を開発/運用している
  • Kaleidoとの接点はAPIコールで行われる
    • /search: 検索
    • /preprocess: 図面の素材等のデータのインデキシング
    • /feature_extract_and_save: Deepモデルへの推論と、OpenSearchへのインデクシング

OpenSearch

ではこの検索エンジンOpenSearchについて少し説明していきます。

OpenSearchとは

image

OpenSearchはElasticsearchからforkしたプロジェクトです。 詳細はリンク先に譲りますが、ライセンス戦略等に発したAWSとElastic社の仲違いから発生したものです。

Tipsとして、OpenSearchはElasticsearchに比べて歴史が浅いため検索でヒットしずらいです. エラーメッセージ等もElasticsearchで検索し、その結果をそのままOpenSearchに適用することが多いです。

なぜOpenSearchを採用したか

image

  • 技術選定時点(2021年)ではkNN+filterを実現できる、ANNが使える、ということからOpenSearchを採用しました。
  • 現在ではどちらもElasticsearchに導入されているため、今から画像検索を始める場合はElasticsearchでも十分可能です!

OpenSearchの運用

image

当初はmanagedを採用せず、現在はk8s上で運用していますが、少しずつつらみも出てきているため、ElasticCloud on GCPをはじめとしてmanagedの採用も検討しています。

モデル

image

  • 画像検索が可能なのは、Deep Learningモデルが優秀な特徴量を作成してくれるからです。
  • 1280次元の特徴量を使っていますが、現実的な時間内で検索が可能です

テスト追加で目指す安定運用

後半では開発した検索システムを安定して運用するために、どのようにテストを追加したかや、監視の具体について説明していきます。

全体像

image

CI/CD内でカバーするunit test、Integration Testの話と、アプリケーションがデプロイされた後のUIテストや監視の全てが大事な要素です。

unit test

image

  • Functionやモジュールごとのテストのをするのがunit testになります。
  • 類似検索パッケージの中ではcode coverageは100%近くあり、PRのたびにテストがあるかを互いにチェックしています。
  • 工夫ポイント
    • クエリプロセッサは複雑なので、複数パターンの入力→複数パターンの出力をテストしています
    • Deep推論部分は変数が多いため、Configをクラス化するなど型づけを厳密にしています
    • endpointはOpenSearchClientを使っているので、テストではclientをmockしています
# PostProcessor コードの一部
class PostProcessor:
    """map the opensearch results to format in kvs
    (snake_case to camelCase)
    """

    @staticmethod
    @postprocessor_exception
    def search_postprocess(hits) -> list[SearchOut]:
        def _reformat(data) -> SearchOut:
            return SearchOut(
                id=data["_source"]["source_id"],
                score=data["_score"],
                index=data["_index"],
                foo=data["_source"]["foo"],
                bar=data["_source"]["bar"],
            )

        return [_reformat(x) for x in hits]
# PostProcessorのテストコードの一部
class TestPostprocessor(unittest.TestCase):
    def test_search_postprocess(self):
        hits = [
            {
                "_score": 0.1,
                "_index": "index_1",
                "_source": {
                    "source_id": "id_1",
                    "foo": "foo_1",
                    "bar": "bar_1",
                },
            }
        ]
        expected = [
            SearchOut(
                id="id_1",
                score=0.1,
                index="index_1",
                foo="foo_1",
                bar="bar_1",
            )
        ]
        actual = PostProcessor.search_postprocess(hits)
        self.assertEqual(expected, actual)

Integration test

unit testだけではカバーしきれない部分はIntegration testを書いていきます。(実際にOpenSearchと通信する部分など)

CI上ではdocker-compose.ymlを用いて検索のAPIOpenSearchを起動しています。

# docker-compose.yml
services:
  opensearch:
    build:
      context: ./
      dockerfile: ./docker/OpensearchForLocalDockerfile
    ports:
      - ...

  api:
    build:
      context: ./
      dockerfile: ./docker/ApiDockerfile
    ports:
      - ...

  batch:
    build:
      context: ./
      dockerfile: ./docker/BatchDockerfile

Makefileにテストコマンドを用意することで、ローカルでもCI上でもテストをキックしやすくしています。

# Makefile
## run test search
test-search:
    docker compose -f ../docker-compose.yml exec batch poetry run python -m src.pipeline.test_search_pipeline 

IO

image

  • indexの作成→データ挿入→検索までをintegration testを書くことで、DRAWERという外部サービスに対してのcontractに なっています

コード例

  • データ挿入
from itertools import cycle, islice
def post_bulk_preprocess_endpoint(data_length=10, index="example-tenant"):
    """apiのbulk preprocess endpointに対しsampleデータが投入出来ることを確認する"""
    # create test data
    foos = list(islice(cycle(['foo_1', 'foo_2']), data_length))
    bars = list(islice(cycle(['bar_1', 'bar_2', 'bar_3']), data_length)) 

    data = []
    for i in range(data_length):
        data.append(
            {
                "id": f"{index}-{i}",
                "index": index,
                "foo": foos[i],
                "bar": bars[i],
            }
        )
    response = requests.post("http://api:80/preprocess/bulk", data=json.dumps(data))
    response = response.json()
    assert response["message"] == "success bulk index", f"{response}"
    assert response["indexedDataCount"] == data_length, f"{response}"
    assert response["data"]["bulk_count"] == 1, f"{response}"
    requests.get("http://opensearch:9200/_refresh")
    logger.info("bulk preprocess endpoint [OK]")
  • 検索して結果を確認する
def _post_can_find(source_id: str, index: str, polling_time: int, sleep_time: int):
    # filter is successful in finding the correct item
    data = {
        "source_id": source_id,
        "index": index,
        "limit": 2,
        "filters": [{"field": "foo", "boolean_operator": "AND", "keywords": ["1"]}],
    }
    response = post_with_timeout(
        "http://api:80/search", data=json.dumps(data), timeout=polling_time, sleep_time=sleep_time
    ).json()
    assert type(response) == list, f"{response}"
    assert len(response) > 0

    actual = {res["foo"] for res in response}
    expected = {"foo_1"}
    assert actual == expected, f"{expected=}, {actual=}"

snapshot

image

  • 上記のテストだけでは意図せず壊すことがあります。暫定的な対応をした後に、今後同じようなミスを起こさないように対策をします。
  • この例では、dict の要素に対して []でアクセスしたため、データマイグレーションをしていなかった環境では検索が落ちました.
  • 暫定対応ののち、再発防止のためにSnapshotをとっておき、Snapshotで検索ができることをCIで保証しました。

コード例

  • snapshotを復元する
def restore_opensearch_snapshot():
    """opensearchのsnapshotからデータを復元する"""
    # add settings
    headers = {"content-type": "application/json"}
    data = {"type": "fs", "settings": {"location": "/snapshot"}}
    requests.put("http://opensearch:9200/_snapshot/snapshot", data=json.dumps(data), headers=headers)

    # restore index "snapshot", version "1"
    response = requests.post("http://opensearch:9200/_snapshot/snapshot/1/_restore")
    logger.info(f"restore snapshot: {response.json()}")
    response = requests.get("http://opensearch:9200/snapshot-tenant/_refresh")
    logger.info(f"refresh: {response.json()}")
    logger.info("restore opensearch snapshot [OK]")
  • 検索が動くことを保証する
def post_search_with_snapshot(polling_time: int, sleep_time: int):
    """
    index mappingsが変更された場合等、text diffでは確認出来ない破壊的変更のための回帰テスト
    """
    index = "snapshot-tenant"
    source_id = "snapshot-0"
    data = {"source_id": source_id, "index": index, "limit": 1}

    response = post_with_timeout(
        "http://api:80/search", data=json.dumps(data), timeout=polling_time, sleep_time=sleep_time
    ).json()
    assert type(response) == list, f"{response}"
    assert len(response) == 1, f"{response}"
    assert response[0]["source_id"] == "snapshot-1", f"{response}"
    assert response[0]["score"] > 0.0, f"{response}"
    logger.info(f"search snapshot tenant={tenant}, item_id={item_id} [OK]")

checkerboard features

image - UIテストでのバグ発生から、テストを追加して再発を防止した例です - kNN search + filterのあと、kNNの結果が反映されていないとUIテストからフィードバックがありました - 事象が判明した時点でPdMから全体に周知されています - 正しくはscore=replace をクエリに追加する必要がありました - 直した後、ポストモーテムで全員に根本原因と対策が記録、周知されます - テストデータのkNNベクトルを乱数ではなく、検索の結果に意味があるものに変更してテスト可能にしました

コード例

  • checkerboard(互い違い)の特徴量を作成する
def checkerboard_array(i: int, length: int = 1280) -> list[float]:
    """A checkerboard feature to test similarities

    e.g.,
    0: 0, 1, 0, 1, ...
    1: 1, 0, 1, 0, ...
    2: 0, 1, 0, 1, ...

    Args:
        i: the index of the data
        length: length of the feature

    Returns: a list of `length` floats
    """
    np.random.seed(i)
    a = np.random.normal(loc=0.9, size=length)
    a = np.clip(a, 0.5, 1)
    b = np.zeros(length)

    # [num, 0, num, ...] for even, [0, num, 0, ...] for odd
    if i % 2 == 0:
        ab = (a, b)
    else:
        ab = (b, a)

    c = []
    for element in zip(*ab):
        c.extend(element)

    c = c[0:length]
    return c
  • 奇数番目のデータは奇数番目のデータが類似することを確認し(odd_comes_first),かつfilterが機能していることを確認する
def _find_one_on_amount(data_length: int, index: str, polling_time: int, sleep_time: int):
    data = {
        "source_id": f"{index}-3",
        "index": index,
        "limit": data_length,
        "filters": [{"field": "bar", "value": "3"}],
    }
    response = post_with_timeout(
        "http://api:80/search", data=json.dumps(data), timeout=polling_time, sleep_time=sleep_time
    ).json()
    assert type(response) == list, f"{type(response)=}"
    assert len(response) > 0, f"{len(response)=}"
    assert {r['bar'] for r in response} == {"bar_3"}

    # check that even comes first (because of checkerboard features)
    assert odd_comes_first(response)
def odd_comes_first(response) -> bool:
    ids = [int(r["id"].split("-")[-1]) for r in response]  # [2, 4, 6, 3, 1, ...]
    still_true = True
    for num in ids:
        if num % 2 == 1:
            if still_true:
                continue
            else:
              return False
        else:
            still_true = False
            continue

    return True

pruning tests

image

  • 基本的にテストはどんどん増えていきます
  • 不要なテストを削除したり、依存するものを解消したりすることも時に必要です(diffが大きくなりますが! )
  • 適切なものに絞ることで開発者経験をよく保ちます

監視

image

  • 上記のようにテストを追加していても、UIテストでバグが発覚したりアプリケーション上ではエラーが起きたりします。
    • 監視をして、適切に対応したりチーム全体にアラートしたりすることが重要です。
  • KaleidoではDatadogにログをためて、ダッシュボードでレスポンスタイムやリクエスト数を見ています.
  • またエラーはSentryで記録していて、監視用のSlackチャネル(#tech-kaleido-alerts)に通知来るようにしています。

  • エラーが上がったら開発しているチームで障害対応をしています。

精度問題

image

  • 精度問題は必ずどこかで出てきます
    • 仕組みで対応するために、改善要望リストを作成し、定量評価データセットを用意しています
    • Kaleidoでは定量評価データセットをPdM2名MLE2名で作成しています

運用の移譲

image - DRAWERもリリースし顧客からの改善要望が増えるにつれ、AI LabではなくDRAWERチームで運用できる体制に移行しようとしています - 運用の移譲のために、技術の整理やロードマップの整理をしています

もっと他のお話が聞きたい方へ

9/15にAI Labの中村さんがMLOpsのお話をしてくれます! (Datadogの運用やSLOの設定をしてくれたのは中村さんでした。) CADDi Tech Chat 〜MLOpsエンジニアの働き方紹介〜

中村さんは最近はMLモデルをデプロイするための基盤を作ってくれていたり、MLモデルのデモの基盤を作ってくれていたりと、MLエンジニアがより成果を出しやすくするための取り組みを色々やってくれています。面白い話になると思いますのでみなさんぜひ!

We are hiring!

キャディでは一緒に働いてくれる仲間を募集 しています。

最近のAI Labでは2D/3Dのデータに対するモデルや、API基盤を作ったり、デモ基盤を作ったり、アノテーション基盤を作ったり、モデルだけでなくそれらの周辺技術を自分達で整備していく仕事もしています。 今後もDRAWERの進化のための機能開発や、より広く製造業全体の進化のための製造拠点のデータを活用したプロダクトなどを作っていきたいと思っています。

AI Labやキャディでの仕事に少しでも興味をもたれた場合は、ぜひ面接やカジュアル面談を申し込んでいただければ幸いです。 また、 Tech Blogや勉強会等のイベントについてはSNSで随時発信しておりますので、Twitterのフォローやconnpassのメンバー登録をぜひよろしくお願いします。