LayerX エンジニアブログ

LayerX の エンジニアブログです。

Vertex AI PIpelinesでの実験を加速させるためのTIPS #LayerXテックアドカレ

この記事は、LayerX Tech Advent Calendar 2024 の 5 日目の記事です。

tech.layerx.co.jp

こんにちは。バクラク事業部のAI-OCRグループでTech Leadをしている島越 (@nt_4o54)です。 今回は、Vertex AI Pipelinesを用いてチームで実験を行う際のTIPS的なものを紹介しようと思います。 なお、Vertex AI Pipelines自体の詳細な説明については割愛します。 過去にもVertex AI Pipelinesについての記事をいくつか公開してるので、(ちょっと情報が古いですが)興味があればそちらもご覧ください。

tech.layerx.co.jp

tech.layerx.co.jp

はじめに

最近我々のグループでは、パーソナライズドAI-OCRという新しいAI-OCRをリリースしました。 このパーソナライズドAI-OCRは今までにないAI-OCRで、従来のAI-OCRが苦手とされるような、同一帳票に対して「お客様によって欲しい値が異なる」という状況に対してのソリューションを提供した機能になります。 詳しくは以下のプレスリリースをご覧ください。

bakuraku.jp

今回は、パーソナライズドAI-OCRの技術的な部分には触れませんが、これを開発するにあたって役に立ったVertex AI PIpelinesの使い方についていくつか紹介できたらと思います。

自作キャッシュ機能

Vertex AI Pipelinesでデフォルトで提供されているキャッシュ機能*1は、Artifact Registoryに置いてあるイメージのuriが変わったり、環境変数、コマンドの順序などのArtifactに関係ないものの変更に対してもキャッシュが効かなくなってしまいます。 そのため、「誰かがやった予測結果を用いて、後処理だけ修正して実行したい」といったことをやりたい時に、コードやユーザ環境変数などを書き換えることでDockerイメージのハッシュ値が書き換わり、キャッシュが効かなくなるといったことが発生します。 このような場合に、毎回データセット作成、モデルの訓練から再度実行するのは、改善のイテレーションを素早く回す上で非常に手間です。パーソナライズドAI-OCRのモデル作成は全部(事前学習以外)を実行しようとすると数時間かかるパイプラインになっているので、この部分を効率化することが重要でした。

このような問題に対して、今回は以下のような最低限の自作キャッシュ機能を用いることで解決しました。

  1. 各コンポーネントで生成されるArtifactのパスをyamlなどで管理しておき、キャッシュする際はyamlのpathからArtifactを取得する。
  2. どこまでキャッシュするかは、コマンド引数で渡して人手で制御できるようにする。

具体的には、以下のように実現しました。 簡単な例のために、preprocessとtrainとpredictだけある以下のようなpipelineを考えます。

from typing import Any

from kfp import dsl

class TrainPipelineRunner:
    def __init__(self, pipeline_root: str, pipeline_name: str, image_uri: str) -> None:
        self.pipeline_root = pipeline_root
        self.pipeline_name = pipeline_name
        self.image_uri = image_uri

    def get_pipeline(self) -> Any:
        @dsl.container_component
        def preprocess_component(dataset_dir: dsl.Output[dsl.Artifact]) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "preprocess.py"],
                args=["--output-dir", dataset_dir.path],
            )

        @dsl.container_component
        def train_component(
            dataset_dir: dsl.Input[dsl.Artifact], model_dir: dsl.Output[dsl.Artifact]
        ) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "train.py"],
                args=["--dataset-dir", dataset_dir.path, "--model-dir", model_dir.path],
            )

        @dsl.container_component
        def predict_component(
            dataset_dir: dsl.Input[dsl.Artifact],
            model_dir: dsl.Input[dsl.Artifact],
            predict_dir: dsl.Output[dsl.Artifact],
        ) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "predict.py"],
                args=[
                    "--dataset-dir",
                    dataset_dir.path,
                    "--model-dir",
                    model_dir.path,
                    "--predict-output-dir",
                    predict_dir.path,
                ],
            )

        @dsl.pipeline(pipeline_root=self.pipeline_root, name=self.pipeline_name)
        def pipeline() -> None:
            preprocess_op = preprocess_component()
            train_op = train_component(dataset_dir=preprocess_op.output)
            predict_op = predict_component(
                dataset_dir=preprocess_op.output, model_dir=train_op.output
            )

        return pipeline

このようなパイプラインで、train自体はやり直さずpredictだけを修正して評価したいといった場面を考えます。 上記のpipelineだとpreprocessとtrainのArtifactの出力に依存しているため、ここにキャッシュ機能を効かせたいのですが、ちょっとした修正でimage_uriなどが書き換わってしまったりしてキャッシュが効かなくなってしまいます。 そこで、今回は以下のような関数を用意して、自作キャッシュ機能を作成しました。

@dsl.component(base_image="python:3.10.11-slim")
def create_artifact_dir(display_name: str, artifact: dsl.Output[dsl.Artifact]) -> None:
    """
    Args:
        display_name (str): UI上での表示名
        artifact (dsl.Output[dsl.Artifact]): 生成されるArtifactのpath
    """
    import re

    # 生成されるArtifactのpathをdisplay_nameに追従させて見やすくするため
    artifact.uri = re.sub(
        r"create-artifact-dir(-\\d+)?_-?\\d+", display_name.replace("_", "-"), artifact.uri
    )

def create_artifact_dir_if_not_cached(display_name: str, uri: str, cache: bool) -> PipelineChannel:
    """
    cacheしない場合はアーティファクトディレクトリを作成し、する場合はimportしたArtifactを返す
    Args:
        display_name (str): UI上での表示名
        uri (str): cacheしない場合に使用するuri
        cache (bool): cacheするかどうか

    Returns:
        artifact (PipelineChannel): 生成されたArtifactのpath
    """
    if cache:
        import_artifact_op = dsl.importer(
            artifact_uri=uri, artifact_class=dsl.Artifact, reimport=False
        ).set_display_name(display_name)
        artifact = import_artifact_op.output
    else:
        create_artifact_op = (
            create_artifact_dir(display_name=display_name)
            .set_caching_options(False)
            .set_display_name(display_name)
        )
        artifact = create_artifact_op.output
    return artifact

@dsl.component(base_image="python:3.10.11-slim")
def identity_op() -> None:
    pass

まず、create_artifact_dirは単純にArtifactを保存するGCS上のpathを作成する関数です。今まで各コンポーネントに対してdsl.Outputを渡してpathを生成していた部分を外出しした形になります。 また、create_artifact_dir_if_not_cachedという関数では、cacheさせる時は渡されたuriのpathをそのままdsl.importerでpipeline上で認識できる形に変換させます。また、cacheさせない時には先ほどのcreate_artifact_dirを用いてuriを生成します。 identity_opは何もしない関数で、cacheを効かせた時の元の依存関係を壊さないようにするためだけに使います。 これらを用いると最初のpipelineは以下のように書き直せます。

class TrainPipelineRunner:
    def __init__(
        self,
        pipeline_root: str,
        pipeline_name: str,
        image_uri: str,
        preprocess_cache: bool,
        train_cache: bool,
    ) -> None:
        self.pipeline_root = pipeline_root
        self.pipeline_name = pipeline_name
        self.image_uri = image_uri
        # 子componentのキャッシュが有効な場合、親componentのキャッシュも有効にする
        # preprocess -> train -> predict
        self.train_cache = train_cache
        self.preprocess_cache = self.train_cache or preprocess_cache

    def get_pipeline(self) -> Any:
        @dsl.container_component
        def preprocess_component(dataset_dir: dsl.Input[dsl.Artifact]) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "preprocess.py"],
                args=["--output-dir", dataset_dir.path],
            )

        @dsl.container_component
        def train_component(
            dataset_dir: dsl.Input[dsl.Artifact], model_dir: dsl.Input[dsl.Artifact]
        ) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "train.py"],
                args=["--dataset-dir", dataset_dir.path, "--model-dir", model_dir.path],
            )

        @dsl.container_component
        def predict_component(
            dataset_dir: dsl.Input[dsl.Artifact],
            model_dir: dsl.Input[dsl.Artifact],
            predict_dir: dsl.Input[dsl.Artifact],
        ) -> dsl.ContainerSpec:
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=["python", "predict.py"],
                args=[
                    "--dataset-dir",
                    dataset_dir.path,
                    "--model-dir",
                    model_dir.path,
                    "--predict-output-dir",
                    predict_dir.path,
                ],
            )

        @dsl.pipeline(pipeline_root=self.pipeline_root, name=self.pipeline_name)
        def pipeline(dataset_dir: str, model_dir: str) -> None:
            dataset_dir = create_artifact_dir_if_not_cached(
                display_name="dataset_dir",
                uri=dataset_dir,
                cache=self.preprocess_cache,
            )
            if self.preprocess_cache:
                preprocess_op = identity_op().set_display_name(preprocess_component.name)
            else:
                preprocess_op = preprocess_component(dataset_dir=dataset_dir)
            model_dir = create_artifact_dir_if_not_cached(
                display_name="model_dir",
                uri=model_dir,
                cache=self.train_cache,
            )
            if self.train_cache:
                train_op = identity_op().set_display_name(train_component.name).after(preprocess_op)
            else:
                train_op = train_component(dataset_dir=dataset_dir, model_dir=model_dir).after(
                    preprocess_op
                )
            # predictは必ず実行される
            predict_dir = create_artifact_dir(display_name="predict_dir")
            predict_op = predict_component(
                dataset_dir=preprocess_op.output,
                model_dir=train_op.output,
                predict_dir=predict_dir.output,
            ).after(train_op)

        return pipeline

違いとしては、以下のようになります。

  • 各Componentはcreate_artifact_dir_if_not_cachedで生成されたArtifactを受け取るように、引数をdsl.Output -> dsl.Inputに変更。
  • 依存関係はArtifactの繋がりで作れなくなったので、.afterで定義。
  • cacheを効かせたい場合は、各実行コンポーネントはidentity_opで代替させる。identity_opがないと、キャッシュを効かせた時に.after(train_op)などの依存関係を作れないため必要。
  • cacheを効かせない場合は、通常通り実行される。
  • 上流のコンポーネントのcacheが有効な場合は、下流のコンポーネントのcacheも有効化させる。

このようにすることで、yamlなどでdataset_dirmodel_dirを管理しておき、引数でpreprocess_cachetrain_cacheを実行引数に渡すようにしておけばキャッシュを効かせることができます。我々のチームでは、Hydra*2をパラメータ管理に使用しているので、相性の良さからyamlで管理することにしています。 yamlの管理は、実験が上手くいった際はPRで更新するという運用で今のところは上手くいっています。この部分の管理が煩雑になることは想像できるので、時間がある時にCIなどで更新できるようにしたいと考えています。

データ読み込みは.uri、書き込みは.pathを用いる

Vertex AI Pipelinesではdsl.Inputdsl.Outputで渡されたArtifactのpathはGCS FUSE*3を用いてgs://部分が/gcs/部分に置き換わった場所にマウントされます。 dsl.Input[dsl.Artifact]でコンポーネントに渡されたArtifactクラスの使い方は二通りあり、artifact.uriを使うとgs://パターンのpathが、artifact.pathを使うと/gcs/パターンのpathが用いられます。 そのため、.pathを使ってしまうとGCS FUSEでマウントされたディレクトリからデータを読み込もうとするため、非常に遅くなる場合があります。 弊チームではpolarsを採用しているため、pl.read_parquetなどにgs://から始まるpathを渡しても自動でgcsfs*4を用いて読み込みをしてくれます。このgcsfsを用いた場合の方が読み込みが早いことが多く、実際のケースだと20分から2分まで読み込みが短縮されました。そのため、以下のコードのように読み込み部分のpathには.uriで渡した方が効率が良くなります。一方で、書き込みにはpolarsなどで直接gcsに書き込む機能が提供されていないため、.pathを渡さなければいけません。(丁度昨日にCloudサポートのPR*5がマージされているのを観測したので、次回のリリースから可能になるかもしれません)

@dsl.container_component
def train_component(
    dataset_dir: dsl.Input[dsl.Artifact], model_dir: dsl.Input[dsl.Artifact]
) -> dsl.ContainerSpec:
    return dsl.ContainerSpec(
        image=self.image_uri,
        command=["python", "train.py"],
        args=["--dataset-dir", dataset_dir.uri, "--model-dir", model_dir.path],
    )

並列実行する際はコンポーネントではなくパイプライン化する

最後は高速化というよりは、コードをミニマムにするためのTIPSのような話です。 でかいデータセットに対してバッチ推論などを動かしたい場合、データセットを分割して並列に予測させたい場合があります。この場合、並列に予測 -> 予測したものを集約するといった作業が必要になります。 このような流れを組みたい際にVertex AI Pipelinesでは、dsl.ParallelFordsl.Collectedを用いて実装することが普通です。 しかし、今回のように.afterを用いて依存関係を組み立てたい場合、以下のように構成するとエラーが発生してしまいます。

with dsl.ParallelFor(
    items=list(range(10)),
    name="parallel_for_predict_component",
    parallelism=10,
) as shard_idx:
    predict_op = predict_component(
        dataset_dir=dataset_dir,
        model_dir=model_dir,
        predict_dir=predict_dir,
        shard_idx=shard_idx,
    ).after(train_op)
collect_op = collect_component(predict_dir=predict_dir).after(predict_op)
# エラーが発生する
# kfp.compiler.compiler_utils.InvalidTopologyException: Illegal task dependency across DSL context managers. A downstream task cannot depend on an upstream task within a dsl.ParallelFor context unless the downstream is within that context too or the outputs are begin fanned-in to a list using dsl.Collected.

この場合、predict_componentでは特にoutputを生成してないので、dsl.Collectedを使うためにoutputを生成する架空のコンポーネントを作って、それを集約する架空のコンポーネントを作って。。ということをしなければいけません。

@dsl.component
def vacant_component(args: list[str]) -> str:
    return ""

with dsl.ParallelFor(
    items=list(range(10)),
    name="parallel_for_predict_component",
    parallelism=10,
) as shard_idx:
    predict_op = predict_component(
        dataset_dir=dataset_dir,
        model_dir=model_dir,
        predict_dir=predict_dir,
        shard_idx=shard_idx,
    ).after(train_op)
    after_predict_op = vacant_component(args=[]).after(
        predict_op,
    )
after_parallel_predict_op = vacant_component(
    args=dsl.Collected(after_predict_op.output)
)
collect_op = collect_component(predict_dir=predict_dir).after(after_parallel_predict_op)

ちょっと煩雑ですよね。こんな時、この並列実行部分をdsl.pipelineでラップすれば解決します。以下のように書けば、pipeline終了後に単純に集約するという書き方ができるのでコード量的にだいぶシンプルになります。

@dsl.pipeline(pipeline_root=pipeline_root, name="parallel_predict_pipeline")
def parallel_predict_pipeline(
    dataset_dir: dsl.Input[dsl.Artifact],
    predict_dir: dsl.Input[dsl.Artifact],
    predict_artifact_dir: dsl.Input[dsl.Artifact],
) -> None:
    with dsl.ParallelFor(
        items=list(range(10)),
        name="parallel_for_predict_component",
        parallelism=10,
    ) as shard_idx:
        predict_op = predict_component(
            dataset_dir=dataset_dir,
            model_dir=model_dir,
            predict_dir=predict_dir,
            shard_idx=shard_idx,
        )

# =========== 元のpipeline内 ===========
parallel_predict_op = parallel_predict_pipeline(
    dataset_dir=dataset_dir,
    model_dir=model_dir,
    predict_dir=predict_dir,
).after(train_op)
collect_op = collect_component(predict_dir=predict_dir).after(parallel_predict_op)

最後に

今回はVertex AI Pipelinesを用いた実験パイプラインの効率化TIPSについていくつか紹介しました。 このようにLayerXでは日々の機械学習開発を効率化するために、色々な生産性向上施策を行なっています! まだまだ課題は多く、いかにチームでの機械学習開発を効率化するかに日々頭を悩ませております。 今後もお客様のもとへ最速で機械学習機能を届けるために邁進していきたいと思います!

明日はminako-phさんからありがたい話が掲載される予定です!お楽しみに!