こんにちは。 バクラク事業部、機械学習チームでテックリードをしている島越@shimacosです。 今年になってギックリ腰になってしまい、整体とパーソナルジムに通い始めました。 チームメンバーには島腰といじられています。健康体になりたい。
はじめに
機械学習チームでは、機械学習パイプラインの作成と定期実行サービスにGoogle Cloud Platform(GCP)のVertex AI Pipelinesを採用しています。 前回までの軌跡はこちらで紹介しています。
今回はscheduler APIを用いる際に、どのようにしてpipelineをコードベースに追従させるかというCI/CDの部分での試行錯誤についての記事になります。
Vertex AI Pipelinesについて
Vertex AI Pipelinesは、Kubeflow Pipelines SDKやTFX pipeline DSLを用いて、GCP上でサーバレスにpipelineを構築することができるサービスです。 Vertex AI Pipelinesを使用する際には以下のような3つの登場人物が存在します。
- KubeFlow Pipeline (KFP) Template:pipelineの中身をCompileしたjsonやyamlファイル。Artifact Registryで管理することができる。
- Docker image:pipelineの
container_component
で使われるimage。Artifact Registryで管理することができる。 - Scheduler API:KFP Templateを参照して定期実行をトリガーするAPI。cronや同時実行可能数などを管理している。
関係性としては以下の図のような感じになっています。
これら3つを、localでの実験とコンフリクトしないようにどのようにしてCIを通じてデプロイするのかが今回の記事になります。
デプロイ方針
結論から話すとデプロイされているSchedulerからはKFP TemplateもImageもlatestのtagを見るという運用にしました。 以下に、なぜそのような方針にしたのかの詳細を説明します。
まず考えられるのが、CI上でgitのcommit hashなどで発行したtagを用いる運用方法です。 この場合だと、Scheduler APIが参照するKFP TemplateがCI実行のたびに変化することになります。 そうすると、Scheduler APIが参照するKFP Templateをupdateする仕組みが現在導入されていないので、画像のようにtagごとに新しいSchedulerが作成されてしまうことになります。 また、同じバージョンを指定したからといって、よしなにSchedulerをupdateしてくれるわけではなく、新しく全く同じSchedulerが生成されてしまいます。
このようにしてしまうと、バージョンを跨いだパイプライン実行の実行時間の監視などがUI上でできなくなってしまったり、CIが実行されるたびにSchedulerが生成されてしまうので、なかなかカオスになってしまうという問題が発生します。 そのため、Schedulerとしては、常にKFP Templateのlatest tagを見にいくという方法を取ることにしました。
次に、Docker Imageをどのようにして運用するかです。Docker ImageはCIから発行したtagを用いてuploadしてもいいのですが、KFP Templateではlatestが使われ、Docker Imageではlatestが使われていないとなると混乱が発生するため、今回の場合はDocker Imageもlatestを使用するという方針にしました。
この場合、発生するのが実験などを手元で行う際に意図せずlatest tagにアップロードしてしまうという問題です。 今回の場合は、yamlのパラメータを見てDocker ImageやKFP Templateのpushを行うようにすることで、latestにpushできないような仕組みにするようにしました。 以下のようなOmegaConf形式のyamlを用いていて、CIでTAGに環境変数が設定されたときは、そのtagを使うようにし、localで開発や実験を行う際は、versionの値をいじって実験を行うようにしています。
version: baseline env: name: dev tag: ${oc.env:TAG, ${version}} gcp: project_id: gcp-sample-project-${env.name} location: asia-northeast1 image_uri: ${env.gcp.location}-docker.pkg.dev/${env.gcp.project_id}/sample-project/pipeline-runner:${tag}
最後に、Schedulerとしては、cronや同時実行可能数、参照するKFP Templateなどを管理しているだけで頻繁に変更するものではないため、Terraformで管理することとしました。
最終的な全体感としては以下のようになります。
落とし穴
こちらで報告されているように、GitHub ActionsでKFP TemplateをArtifact Registryにアップロードする際にkfpのRegistryClient
を使っていると、通常のWorkload Identityを用いた認証だけではエラーでアップロードすることができません。
その場合、CIの中で以下のようにaccess_tokenを発行した上で、RegistryClient
に渡す必要があります。
- id: gcp-auth uses: google-github-actions/auth@v2 with: workload_identity_provider: "${{ env.WORKLOAD_IDENTITY_PROVIDER }}" service_account: "${{ env.GCP_SERVICE_ACOUNT }}" token_format: access_token
今回の場合は、以下のように上記のstepのアウトプットを環境変数として渡すことで、pipelineのuploadを行う関数内でRegistryClient
に渡しています。
- name: Build Image and push pipeline template env: # wandb api key ACCESS_TOKEN: ${{ steps.gcp-auth.outputs.access_token }} run: inv deploy-pipeline # docker imageのpushとKFP Templateのuploadを行うコマンド
def upload(self, package_path: str, tag: str) -> None: """ Upload pipeline YAML. Args: package_path (str): pipelineをbuildした結果のyamlの保存先 tag (str): kfpのtemplateに対して付与されるtag """ from kfp.client import RegistryClient, ApiAuth access_token = os.getenv("ACCESS_TOKEN") auth = ApiAuth(access_token) if access_token else None compiler.Compiler().compile( pipeline_func=self.get_pipeline(), package_path=package_path ) client = RegistryClient(host=self.kfp_uri, auth=auth) client.upload_pipeline(file_name=package_path, tags=[tag]) logger.info(f"Uploaded pipeline to {self.kfp_uri}/{self.pipeline_name}/{tag}")
辛い部分
まだまだリリースされたばっかりの機能ということもあって、未実装?バグ?がいくつかあるので、今後のリリースに期待です。
- 全てのスケジュール実行がlatestを見ているため、UI上でどこからモデルが切り替わったか、などが分からない。
- 今回のlatest運用をすることになってしまった原因ですが、Deploy済みのSchedulerに対して参照するKFP Templateをupdateする機能が欲しい。
- (2024年12月追記): 以下のようにちょっとハッキーなことをすることで、schedulerで管理されているkfpのversionを更新することができることが分かりました。これを用いることで上記のようにkfpに関してlatest運用する必要がなくなり、またUI上でもどこからTAGが切り替わったのかわかるようになります。
def update_schedule_template_version(self, schedule_id: str, version: str) -> None: """ 最新のtemplate versionにscheduleを更新 Args: schedule_id (str): Schedulerのid version (str): kfpのtemplateのversion """ from google.protobuf import field_mask_pb2 access_token = os.getenv("ACCESS_TOKEN") credentials = Credentials(token=access_token) if access_token else None job_schedule = aiplatform.PipelineJobSchedule.get( project=self.project_id, location=self.location, schedule_id=schedule_id, credentials=credentials, ) # FIXME:公式でtemplateが更新できるようになったら対応する # job_schedule.update(**kwargs) schedule = job_schedule.gca_resource schedule.create_pipeline_job_request.pipeline_job.template_uri = ( f"https://{self.kfp_uri}/{self.pipeline_name}/{version}" ) update_mask = field_mask_pb2.FieldMask(paths=["create_pipeline_job_request"]) job_schedule.api_client.update_schedule(schedule=schedule, update_mask=update_mask)
- Schedulerからトリガーされたパイプライン実行では、ExitHandlerが上手く動かずSlack通知が行われない。
- Schedulerからトリガーされたパイプライン実行にVertex AI Experimentを渡すことができず、同じSchedule内で実行されているpipelineのMetricを横軸監視できない。
- これができるようになると、Experimentのdisplay_nameにバージョン名などを設定することで、一つ目の問題であったどこからバージョンが切り替わったのかという情報がUI上で分かるようになるので、実装に期待です。
- (従来のCloud Pub/Subを使った方法であれば可能ですが、UI上で上の図のようにスケジュール単位で見ることはできません)
最後に
今回はVertex AI Pipelinesを運用するにあたってのデプロイ方法について紹介しました。 今はスモールチームなため、このような運用方法でも問題は起きにくいですが、今後もVertex AI Pipelinesのアップデートを追いながら常にupdateしていこうと思います!
最後になりましたが、バクラクの機械学習チームでは機械学習エンジニアやMLOpsエンジニア、ソフトウェアエンジニア、インターン生を積極採用中です! 少しでも機械学習チームに興味を持ってくださった方はまずはカジュアルにでもお話しさせてください!