LayerX エンジニアブログ

LayerX の エンジニアブログです。

Vertex AI Pipelinesを用いて爆速ML開発の仕組みを構築する #LayerXテックアドカレ

こんにちは。LayerXのバクラク事業部で機械学習エンジニアをしている@shimacosです。 最近、体重が増える一方で危機感を感じ始めたので、ダイエットを始めました。 ダイエットを始めて早3ヶ月ほどですが、一向に痩せません。何故でしょう?

この記事はLayerXアドベントカレンダー11日目の記事です。 昨日は@upamune「Slack × Zapier × MiroでKPTでの振り返りをラクにする」という記事を書いてくれました。 明日は@itkqが、楽しい話を書いてくれる予定です。


はじめに

バクラクの機械学習チームでは、AI-OCRという請求書や領収書などの帳票から、仕訳などの経理業務や電子帳簿保存法の要件を満たすための項目を抽出する機能を構築しています。 AI-OCR機能については、以下の資料などで詳しく述べられています。

バクラクでは、ありがたい事に導入社数が順調に伸びており、それに伴いさまざまなフォーマットの帳票がデータとして蓄積されています。

機械学習チームとしては、今までにない帳票がアップロードされることも多いため、そのような帳票にいかに早く対応できるようにしていくかが重要になってきます。

もちろん、大量に蓄積されているデータからどのようにデータセットを作成するか、どのように評価を行うのか、など課題はたくさんあるのですが、新しいデータに自動で対応できるようにするためには、直近のデータを用いて再学習を効率的に行う定期実行パイプラインが目の前必要になっていました。

さらに、機械学習チームではこの半期のOKRで「AI-OCR以外で新たに5つの機械学習による価値を作る」という目標を掲げており、できれば使いまわせるutilsやvertexのcomponentなどはプロジェクト間で使い回せたほうが開発速度も上がり、OKR達成に近づくだろうと思っていました。

そこで、今回はチーム内で運用されている改善day (普段の優先度の高いプロジェクト以外のことの改善を行う日)を用いて、上記のような課題を解決する方法を実装したので、その方法を紹介させていただきます。

概要としては、以下のような内容になります。少し長くなってしまいましたが、目を通していただけると幸いです。

  • ワークフローツールとしてVertex AI Pipelinesを用いて、いい感じに定期実行できる仕組みを考えた。
  • モノレポ構成にし、共通ツールを他のプロジェクトでも取り回せる形を整えた。

以下のような内容に従って説明していきます。

Vertex AI Pipelinesについて

今回はワークフローツールとしてGoogle Cloud Platformで提供されているVertex AI Pipelinesを選択しました。 Vertex AI Pipelinesは、Kubeflow Pipelines SDKやTFX pipeline DSLを用いて、GCP上でサーバレスにpipelineを構築することができるサービスです。裏では、Vertex AI CustomJob *1を連結させて動かしているのですが、自動でGCS上にアーティファクトを保存してくれたりと、便利な機能が多くあります。

主な選択理由としては以下です。

  • Kubernetesなどのクラスタを持たなくてよく、管理コストが低い。
  • Kubeflow Pipelines (kfp)2.0の初期ではでコンポーネント定義をyamlで書く必要があり、可読性が低く、パイプライン定義するのに一苦労する傾向にあったが、@dsl.container_component*2CustomTrainingJobOp*3などが使えるようになったことで、従来より簡素化され、可読性が高く、負荷が低くパイプライン定義を書けるようになった。
  • 以前まで定期実行を行うには、別でCloud FunctionとCloud Schedulerを持つ必要がありterraform構成が複雑になることがあったが、最近Vertex AI Pipelines内でScheduler API*4が使えるようになり、手軽に定期実行パイプラインを作れるようになった。

上記の通り、自分が以前にVertex AI Pipelinesを使ったリリース初期の時から比べてとても使いやすくなりました。 この辺りの便利になった部分について、まだ世の中にドキュメントが少ないので、その部分について少し説明させていただきます。

コンポーネント定義の書き方

以前のVertex AI Pipelines (kfp v2.0β)では、yamlでコンポーネント定義を書くのが主流でした。

例えば、以前だと訓練するdatasetのパスを受け取って、訓練済みのモデルを出力するのに以下のようなyamlファイルを書く必要がありました。

パッと見ただけで正直ちょっと書きたくなくなるような感じですよね。。笑

このようなファイルを1コンポーネントあたり1ファイル作る必要があり、ファイル管理が煩雑になる傾向がありました。 ちなみに、typeとしてDatasetやModelなどのArtifact型*5を使うことで、自動的にGCSなどに中間生成物を生成してくれます。例えば、以下の例だと引数に持ったmodel_pathに訓練済みモデルを保存するコードを書くと自動的にGCSにファイルが保存されます。

name: train
description: Train XGBoost model

inputs:
  - name: image
    type: String
  - name: dataset_path
    type: Dataset
  - name: learning_rate
    type: Float
    default: 0.001

outputs:
  - name: model_path
    type: Model

implementation:
  container:
    image: "{{$.inputs.parameters['image']}}"
    command:
      - python3
      - ./components/train.py
    args:
      - --dataset-path, {inputPath: dataset_path}
      - --learning-rate, {inputValue: learning_rate}
      - --model-path {outputPath: mdodel_path}

現在だと以下のようなPythonコードに書き換えられるようになりました。 kfpでは2020年中盤でdsl.ContainerOpというコンテナを使ったcomponentが廃止され*6、β版の途中までyamlによるコンポーネント定義が推し進められていたのですが、2.0.0リリース手前で以下のような書き方ができるようになりました。

from kfp import dsl

@dsl.container_component
def train(
    dataset_path: dsl.Input[dsl.Dataset],
    learning_rate: float,
    model_path: dsl.Output[dsl.Model],
) -> dsl.ContainerSpec:
    """
    Train XGBoost model
    """
    return dsl.ContainerSpec(
        image=self.image_uri,
        command=["python3", "components/train.py"],
        args=[
            "--dataset-path",
            dataset_path.path,
            "--learning-rate",
            learning_rate,
            "--model-path",
            model_path.path,
        ],
    )

Python上だと補完が走りますし、定義がどのようにされているかEditorのGo to Definitionなどを使うことで参照しやすくなり、可読性も高まります。

ネット上のVertex AI Pipelinesを使った記事を見ると、Vertex AI Pipelinesリリース当初はdsl.container_componentがなかったこともありyamlで定義された例が多く見られるのですが、今後は公式としてもこちらの形を推していくようです。

Scheduler API

Vertex AI Pipelinesで定期実行の仕組みを作るには、今までCloud Functionでパイプライン実行のトリガーを作り、更にCloud SchedulerでCloud Functionを叩きに行くスケジューラを構築する必要がありました。 しかし、今年の5月にプレビュー機能として実装されたScheduler APIを使うことでVertex AI Pipelinesの設定だけで定期実行部分までを完結できるようになり、非常に便利になりました。 こちらもプレビュー機能の際はREST APIでPOSTするしかなかったのですが、2023年11月現在だとSDKが整備されており、以下のように非常に使いやすくなっています。

def create_schecule(
    project_id: str,
    location: str,
    pipeline_root: str,
    pipeline_name: str,
    template_path: str,
    cron: str,
    enable_caching: bool = False,
    max_concurrent_run_count: int = 1,
    max_run_count: int = 1,
    **kwargs: ParamSpec.kwargs,
) -> None:
    """
    Schedule pipeline
    Args:
        project_id (str): gcp project
        location (str): pipelineを動かすlocation
        pipeline_root (str): pipelineのArtifactを置くgcs pathのroot
        pipeline_name (str): pipeline名
        template_path (str): pipelineのtemplate yamlを保存したlocalのpathがGCSのpath
        cron (str): cronの設定 (東京時間で設定すること)
        enable_caching (bool, optional):
            kwargsの値に対してpipelineの結果をcacheするか.
            Trueの場合、同じパラメータを使っているところは実行されない。Defaults to False.
        service_account (str, optional): pipelineを実行するservice account. Defaults to None.
        max_concurrent_run_count (int, optional): このscheduleで同時に実行できるjobの数. Defaults to 1.
        max_run_count (int, optional): scheduleが完了した後に作成されるpipelineの最大数. Defaults to 1.
    """
    job = aiplatform.PipelineJob(
        template_path=template_path,
        pipeline_root=pipeline_root,
        display_name=pipeline_name,
        project=project_id,
        location=location,
        parameter_values=kwargs,
        enable_caching=enable_caching,
    )
    job.create_schedule(
        display_name=pipeline_name,
        cron="TZ=Asia/Tokyo " + cron,
        max_concurrent_run_count=max_concurrent_run_count,
        max_run_count=max_run_count,
    )

ただし、ここで注意なのですが、scheduleされたpipelineの引数は固定値のものしか使えないようになっており、日付によって動的に引数を変えたいといったpipelineの場合は少し工夫する必要があります。

例えば、週次で再訓練を行う時系列予測問題のようなケースだと、パイプライン実行日時の最新のデータまで用いて訓練したいという状況が発生します。

そのような時は実行日時を引数として持つようなpipelineを作りたくなるのですが、Scheduler APIの場合は引数を動的に変えられないので、以下のように別で実行日時を取得するようなcomponentを用意する必要があります。

@dsl.component(base_image="python:3.10.11-slim")
def create_execution_date_op() -> str:
    from datetime import datetime

    executiion_date = datetime.now().date().isoformat()
    return executiion_date

このexecutuion_dateを使えば、一旦は定期実行の際は大丈夫そうです。 しかし、手元で実験をする際や過去のpipeline実行を再現する時など、常に最新の日付を参照されると困る場合があります。

そのような場合は、次のようにlocal引数などを渡して手元からpipelineを実行する際は外部から渡されたexecution_dateを使うことによって回避可能です。

def get_pipeline(pipeline_root: str, pipeline_name str, local: bool = False) -> Callable[[str], None]:
    @dsl.pipeline(pipeline_root=pipeline_root, name=pipeline_name)
    def pipeline(execution_date: str) -> None:
        if not local:
            execution_date = create_execution_date_op().output

    return pipeline

リソースの設定

Vertex AI Pipelinesでは、デフォルトでコンポーネントは 4コアのCPUと16 GBのメモリを搭載したe2-standard-4マシンを使用して、Vertex AI CustomJobとして動作します。 デフォルトからCPUやメモリを増やしたり、GPUを追加しようとすると以下のように変更する必要があります。 一つ一つのコンポーネントに対してこのようにset_*とするのは中々骨が折れる作業です。

from kfp.v2 import dsl

@dsl.pipeline(name="custom-container-pipeline")
def pipeline():
    SELECTOR_CONSTRAINT = {
        "label_name": "cloud.google.com/gke-accelerator",
        "value": "NVIDIA_TESLA_V100",
    }
    preprocess_component = (
        preprocess(execution_date=create_execution_date_op())
        .set_memory_limit("52G")
    )
    train_component = (
        train(
            dataset_path=preprocess_component.output,
            learning_rate=0.001,
        )
        .set_cpu_limit(16)
        .set_memory_limit("52G")
        .add_node_selector_constraint(SELECTOR_CONSTRAINT)
        .set_gpu_limit(1)
    )

最近だと、google-cloud-pipeline-componentsのライブラリが整備されていて、その機能を使うことでプリセットのmachine_typeを選べるようになりました。

この辺りは好みによりますが、個人的には一つ一つのコンポーネントに対してset_*をしていくより、この方法のほうが可読性や複雑性がなくなり、timeoutの設定も作れたりするので、嬉しい部分が多いのではないかなと感じます。

from google_cloud_pipeline_components.v1.custom_job import (
    create_custom_training_job_from_component,
)
from kfp.v2 import dsl


@dsl.pipeline(name="custom-container-pipeline")
def pipeline():
    preprocess_component = create_custom_training_job_from_component(
        preprocess(execution_date=create_execution_date_op()),
        machine_type="n1-highmem-8",
    )
    train_component = create_custom_training_job_from_component(
        train(dataset_path=preprocess_component.output, learning_rate=0.001),
        machine_type="n1-highmem-8",
        accelerator_type="NVIDIA_TESLA_V100",
    )

その他にもVetex Endpointを立てるコンポーネントやBigQuery関連のコンポーネントなど便利なものが多くあるので、気になる方はドキュメント*7をご覧ください。

辛いポイント

もちろんいい部分だけではなくて、AirFlowやprefectなどに比べて辛い部分もあります。以下にいくつか列挙したので、Vertex AI Pipelinesを使用するか迷ってる際に参考にしてください。

  • 同じpipelineの時系列方向のMetricが見れない。
    • 月曜日だけ何故かパイプライン実行時間が伸びてるなどに気付きにくい。
  • 階層構造を持てないので、同じGCP Projectで複数のpipelineを管理していると、GCPのコンソールがカオスになってくる。
    • 開発プロジェクト毎にGCP Projectを分ける。pipelineごとにラベルをつけてフィルターできるようにするなどの対応が必要。
  • 一つ前の定期実行が失敗している時に次の定期実行は行わないといったことができない。
  • 引数などをうまく設計したキャッシュ戦略を構築しなければ、失敗した部分からの再実行が効率よく行えない。
  • ガントチャートなどのMetricを可視化できない。

実際の運用方針

前章で説明したように、Vertex AI Pipelinesはリリース当初に比べて大分かゆいところに手が届くようになりました。 そこで、最初に説明したようにこのVertex AI Pipelinesを色々なプロジェクトで取り回せるように、こちらの記事*8を参考にモノレポの構成を考えました。 構成としては以下のようなPoetryを利用した形で、言語としては今のところPythonのみを使用する想定です。

├── poetry.lock
├── pyproject.toml
├── tasks.py
├── {{ project-name-a }}
│   ├── invoke.yaml
│   ├── invoke_prd.yaml
│   ├── tasks.py
│   ├── yamls
│   │   ├── preprocess.yaml
│   │   ├── train.yaml
│   │   ├── pipelines.yaml
│       └── ...
│   ├── pipelines
│   │   ├── train_pipeline.py
│   │   └── ...
│   ├── components
│       ├── preprocess
│       ├── train
│       └── ...
├── {{ project-name-b }}
├── libs
    └── layerx-ml-utils
        ├── pyproject.toml
        └── layerx_ml_utils
          ├── vertex_pipeline.py
          ├── bq.py
          ├── invoke.py
          └── ...

ライブラリ管理

まず、すべてのプロジェクトで使われるような共通モジュールをlibs以下にライブラリとして作成します。これに対し、以下のコマンドで各プロジェクトからライブラリの中身を利用できるようにします。

この中身で、共通で使うようなVertex PipelinsのSlack通知コンポーネントや、定期実行などをしやすいようなVertex AI Pipelines SDKをラッパーしたクラスなどを作成します。

poetry add --editable libs/layerx-ml-utils

各プロジェクトで利用するライブラリ管理はPoetryのgroup dependencies機能を使います。

poetry add hoge --group project-name-a

パラメータ管理

詳細は省きますが、invoke*9というPythonのタスクランナーを用いる前提でレポジトリ構成とパラメータ管理方法を考えています。

invokeでは以下のような形でtaskを定義することができ、またデフォルトでrootディレクトリのinvoke.yamlの中身のパラメータにdotアクセスできるようにしてくれます。

from invoke import task, Context

@task
def build_docker(c: Context, push: bool = False) -> None:
   """
   Vertex AI Pipelinesで利用するdocker imageをbuild & pushする
   """
    with c.cd(".."):
        c.run(f"poetry install --with {c.project_name}")
        c.run(
            f"docker build -f {c.project_name}/Dockerfile -t {c.env.gcp.image_uri} ."
        )
        if push:
            c.run(f"docker push {c.env.gcp.image_uri}")

更に、上のように定義しているだけで、ターミナル上からinv build-docker --pushのような形でtaskを実行することができます。

実際には、invoke.taskの中身をラップした関数をlayerx-ml-utilsの中に作っていて、以下のようなHydra*10形式のyamlの書き方をできるようにしています。

defaults:
  - _self_
  - yamls@train: train
  - yamls@preprocess: preprocess
  - yamls@pipelines: pipelines

project_name: [project-name-a]
version: baseline
execution_date: 2023-11-16

env:
  output_dir: ./output/${version}
  gcp:
    project_id: [dev_gcp_project]
    location: asia-northeast1
    service_account: hoge@${env.gcp.project_id}.iam.gserviceaccount.com
    image_uri: asia-northeast1-docker.pkg.dev/${env.gcp.project_id}/${project_name}/runner:${version}

これに対して、以下のようなinvoke_prod.yamlというのを作っておくと、inv -f invoke_prd.yaml ~~と指定することで、既にinvoke.yaml内で定義されているパラメータを上書きしてくれます。devのprojectで動作確認したものをprd環境にdeployする時などに便利です。(localからはprdにデプロイできないように権限周りは調整する前提)

env:
  gcp:
    project_id: [prd_gcp_project]

pipeline開発

layerx-ml-utilsの中で以下のようなVertex AI Pipelines基本的な機能が備わっているラッパークラスを作成します。クラス構成にしているのは、container_componentを作成する時や実行時に共通で使いたい変数などが多く存在するためです。

import tempfile
from typing import Callable, ParamSpec

from google.cloud import aiplatform
from kfp import compiler, dsl


class BasePipelineRunner:
    def __init__(
        self, project_id: str, location: str, pipeline_root: str, pipeline_name: str
    ) -> None:
        """
        Args:
            project_id (str): gcp_project_id
            location (str): gcp_location
            pipeline_root (str): Vertex pipelineの中間生成物が保存されるGCSのパス
            pipeline_name (str): Vertex pipelineで実行されるパイプラインの名前
        """
        self.project_id = project_id
        self.location = location
        self.pipeline_root = pipeline_root
        self.pipeline_name = pipeline_name

    def get_pipeline(self, local: bool) -> Callable[..., None]:
        """pipelineの記述を行う

        Returns:
            pipelineを記述した関数を返す
        """

        @dsl.pipeline(pipeline_root=self.pipeline_root, name=self.pipeline_name)
        def pipeline(**kwargs: ParamSpec.kwargs) -> None:
            # =======================================
            # ここにpipelineの記述を行う
            # =======================================
            pass

        return pipeline

    def run(
        self,
        local: bool = False,
        enable_caching: bool = False,
        service_account: str | None = None,
        **kwargs: ParamSpec.kwargs,
    ) -> None:
        """
        Run pipeline once.

        Args:
            local (bool, optional): local実行かどうか。Defaults to False.
            enable_caching (bool, optional):
                kwargsの値に対してpipelineの結果をcacheするか.
                Trueの場合、同じパラメータを使っているところは実行されない。Defaults to False.
            service_account (str, optional): pipelineを実行するservice account. Defaults to None.
        """
        with tempfile.TemporaryDirectory() as td:
            package_path = f"{td}/pipeline.yaml"
            compiler.Compiler().compile(
                pipeline_func=self.get_pipeline(local), package_path=package_path
            )
            job = aiplatform.PipelineJob(
                template_path=package_path,
                pipeline_root=self.pipeline_root,
                display_name=self.pipeline_name,
                project=self.project_id,
                location=self.location,
                parameter_values=kwargs,
                enable_caching=enable_caching,
            )
            job.submit(service_account=service_account)

    def build(self, local: bool, package_path: str) -> None:
        """
        Generate pipeline YAML.
        Args:
            package_path (str): pipelineをbuildした結果のyamlの保存先
        """
        compiler.Compiler().compile(
            pipeline_func=self.get_pipeline(local), package_path=package_path
        )

    def create_schecule(
        self,
        template_path: str,
        cron: str,
        enable_caching: bool = False,
        service_account: str | None = None,
        max_concurrent_run_count: int = 1,
        max_run_count: int = 1,
        **kwargs: ParamSpec.kwargs,
    ) -> None:
        """
        Schedule pipeline
        Args:
            template_path (str): pipelineのtemplate yamlを保存したlocalのpathがGCSのpath
            cron (str): cronの設定 (東京時間で設定すること)
            enable_caching (bool, optional):
                kwargsの値に対してpipelineの結果をcacheするか.
                Trueの場合、同じパラメータを使っているところは実行されない。Defaults to False.
            service_account (str, optional): pipelineを実行するservice account. Defaults to None.
            max_concurrent_run_count (int, optional): このscheduleで同時に実行できるjobの数. Defaults to 1.
            max_run_count (int, optional): scheduleが完了した後に作成されるpipelineの最大数. Defaults to 1.
        """
        job = aiplatform.PipelineJob(
            template_path=template_path,
            pipeline_root=self.pipeline_root,
            display_name=self.pipeline_name,
            project=self.project_id,
            location=self.location,
            parameter_values=kwargs,
            enable_caching=enable_caching,
        )
        job.create_schedule(
            display_name=self.pipeline_name,
            cron="TZ=Asia/Tokyo " + cron,
            service_account=service_account,
            max_concurrent_run_count=max_concurrent_run_count,
            max_run_count=max_run_count,
        )

    def update_schedule(self, schedule_id: str, **kwargs: ParamSpec.kwargs) -> None:
        """
        Update schedule
        Args:
            schedule_id (str): scheduleのid
        """
        job_schedule = aiplatform.PipelineJobSchedule.get(schedule_id=schedule_id)
        job_schedule.update(**kwargs)

    def delete_schedule(self, schedule_id: str) -> None:
        """
        Delete schedule
        Args:
            schedule_id (str): scheduleのid
        """
        job_schedule = aiplatform.PipelineJobSchedule.get(schedule_id=schedule_id)
        job_schedule.delete()

実際にプロジェクトディレクトリの中で開発を行う際には、このクラスを継承しget_pipelineの部分だけを書き換えれば、基本的にパイプライン実行ができるというような形になっています。

例) データセットを作成し、XGBoostで訓練するpipelineを作成する。

from typing import Callable

from google_cloud_pipeline_components.v1.custom_job import (
    create_custom_training_job_from_component,
)
from kfp import dsl
from layerx_ml_utils.vertex_pipeline import BasePipelineRunner, create_execution_date_op, slack_notification_op


class TrainPipelineRunner(BasePipelineRunner):
    def __init__(
        self,
        project_id: str,
        location: str,
        pipeline_root: str,
        pipeline_name: str,
        image_uri: str,
        prd: bool,
    ) -> None:
        super().__init__(project_id, location, pipeline_root, pipeline_name)
        self.image_uri = image_uri
        self.prd = prd

    def get_pipeline(self, local: bool = False) -> Callable[[str], None]:
        @dsl.container_component
        def preprocess(
            execution_date: str, output_dir: dsl.Output[dsl.Dataset]
        ) -> dsl.ContainerSpec:
            command = ["inv", "preprocess.load-data"]
            if self.prd:
                command += ["-f", "invoke_prd.yaml"]
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=command,
                args=["--output-dir", output_dir.path, "--execution-date", execution_date],
            )

        @dsl.container_component
        def train(
            dataset_path: dsl.Input[dsl.Dataset],
            learning_rate: float,
            model_path: dsl.Output[dsl.Model],
        ) -> dsl.ContainerSpec:
            """
            Train XGBoost model
            """
            command = ["inv", "train.train"]
            if self.prd:
                command += ["-f", "invoke_prd.yaml"]
            return dsl.ContainerSpec(
                image=self.image_uri,
                command=command,
                args=[
                    "--dataset-path",
                    dataset_path.path,
                    "--learning-rate",
                    learning_rate,
                    "--model-path",
                    model_path.path,
                ],
            )

        @dsl.pipeline(pipeline_root=self.pipeline_root, name=self.pipeline_name)
        def pipeline(execution_date: str) -> None:
            if not local:
                execution_date = create_execution_date_op().output
            with dsl.ExitHandler(slack_notification_op):
                preprocesss_op = preprocess(execution_date=execution_date)
                train_op = create_custom_training_job_from_component(
                    train(dataset_path=preprocesss_op.output, learning_rate=0.01),
                    machine_type="n1-hoghmem-32",
                )

        return pipeline

これと、先ほどのinvokeのtask定義を利用して、tasks.py内に以下のような記述をしておけばinv run-pipelineinv schedule-pipelineなどで、pipelineをlocalで実行したり、定期実行パイプラインを作成できるようになります。非常に簡単ですね。

@task
def run_pipeline(c: Context, local: bool = False, prd: bool = False, cache: bool = False) -> None:
    from pipelines.train_pipeline import TrainPipelineRunner

    runner = TrainPipelineRunner(
        c.env.gcp.project_id,
        c.env.gcp.location,
        c.pipelines.pipeline_root,
        c.pipelines.pipeline_name,
        c.env.gcp.image_uri,
        prd,
    )
    runner.run(
        local=local,
        enabling_caching=cache,
        execution_date=c.execution_date,
        service_account=c.env.gcp.service_account,
    )


@task
def schedule_pipeline(c: Context, prd: bool = False) -> None:
    from pipelines.train_pipeline import TrainPipelineRunner

    runner = TrainPipelineRunner(
        c.env.gcp.project_id,
        c.env.gcp.location,
        c.pipelines.pipeline_root,
        c.pipelines.pipeline_name,
        c.env.gcp.image_uri,
        prd,
    )
    runner.build(local=False, package_path=c.pipelines.template_path)
    runner.create_schecule(
        template_path=c.pipelines.template_path,
        cron=c.pipelines.cron,
        enabling_caching=cache,
        service_account=c.env.gcp.service_account,
    )

他にもpipelineを実行せずに手元でデバックできるようにしたり、レポジトリ構造の対称性を意識したりと工夫した部分は多くあるのですが、その話はまた別の機会にしたいと思います。

最後に

今回は最近のVertex AI Pipelinesの概要とそれを用いたモノレポ開発方法について紹介しました。 今回の構成はとりあえず改善dayの一日で考えたものなので、もちろんまだまだ荒削りな部分があります。 今後は、爆速で開発していくためにチームメンバーと一緒により良い構成、開発方法を育てていきたいなと思っています。 とりあえず、デフォルトのプロジェクトのレポジトリ構成やinvoke.yamlの必須パラメータなどをコマンド一つで作れるようにして、開発初期のスピードを上げていきたいです。

最後になりましたが、バクラクの機械学習チームでは機械学習エンジニアやMLOpsエンジニア、ソフトウェアエンジニア、インターン生を積極採用中です! 少しでも機械学習チームに興味を持ってくださった方はカジュアルにお話ししましょう!

jobs.layerx.co.jp

youtrust.jp