LayerX エンジニアブログ

LayerX の エンジニアブログです。

Temporalによるナレッジ更新の同時実行制御

機械学習エンジニアの吉田です。バクラクヘルプデスクエージェントの開発を担当しています。この記事では、バクラクヘルプデスクエージェントにおけるナレッジ更新の同時実行制御を Temporal を活用してどのように実現したか紹介します。

背景

バクラクヘルプデスクエージェントとは

バクラクヘルプデスクエージェントは、社内の問い合わせ対応を自動化するAIエージェントで、社内規程や業務マニュアルを元にSlackで多様な質問に自動で回答します。AIで解決できない場合は適切な担当者へシームレスに取り次ぎ、問い合わせをきっかけとしたマニュアルの更新も支援します。

bakuraku.jp

AIが正確に回答するには、根拠となる社内ドキュメント群 (ナレッジ) の品質が重要です。ナレッジの元になるのは Notion や Google Drive 上の社内規程・業務マニュアルで、これらのソースと同期し、常に最新の状態に保つ必要があります。

ナレッジ更新の同時実行制御が必要になる理由

ナレッジの更新処理には大きく2つの処理があります。ソースから最新のコンテンツを取得して反映する「同期」と、不要なナレッジの「削除」です。なお、ナレッジは管理画面や Slack から直接作成することも可能ですが、これらはソースとの同期を伴わないため、今回の同時実行制御の対象外とします。

これらの更新処理は、管理画面・Slack・日次バッチなど複数の経路から実行されます。異なる経路からの同時更新はもちろん、同一経路であっても複数のユーザーが同時に更新するケースがあり得るため、あらゆる経路・タイミングで同時実行制御が必要になります。

また、ナレッジはフォルダ (親) ・ページ (子) という階層構造を持っています。フォルダ配下の全ページを一括更新するケースと、特定のページだけを個別に更新するケースがあり、フォルダ全体の更新中に同じフォルダ内の個別ページが更新されると、同じページに対して2つの処理が同時に走ることになります。

加えて、更新処理は外部APIからのコンテンツ取得、Markdown変換、ベクトル化、LLMによる要約生成といった複数ステップからなり、完了までに時間がかかります。その間にユーザーがナレッジを削除すると、削除と同期の順序次第では、削除したはずのデータが復活したり中途半端な状態のデータが残るといった不整合が生じる恐れがあります。

開発初期はナレッジのソースも更新経路も少なかったため、DBの排他ロックで同時実行を防いでいました。しかし、ソースが増えていくにつれてエントリポイントも増加し、すべての箇所でロックの取得・解放を正しく実装する負荷が大きくなり、実装漏れや不整合、デッドロックのリスクが高まりました。

こうした課題を踏まえ、既に採用していた Temporal の機能を活かして、同時実行制御の仕組みを見直すことにしました。

Temporalとは

Temporal は、耐久性のあるワークフロー実行を提供するオープンソースのワークフローエンジンです。一度開始されたワークフローは、プロセスのクラッシュやネットワーク障害が発生しても、中断した箇所から自動的に再開されます。リトライ・タイムアウト・状態管理といった分散システム特有の課題をプラットフォーム側が吸収するため、アプリケーション層では業務ロジックに集中できます。

以下では、検討した2つの設計アプローチを順に紹介します。

設計案①:Entity Workflow と Signal キューによるシリアライズ

DBロック方式の課題は、同時実行制御の責任が各エントリポイントに分散していることでした。そこで、同一ナレッジへの操作をひとつのワークフローに集約する Entity Workflow パターンを検討しました。Entity Workflow は、エンティティごとに長寿命のワークフローを常駐させ、そのライフサイクルを一元管理するパターンです。

Signal によるキューイング

今回のケースでは、すべての操作を Signal 経由でキューに追加し、順次処理することでシリアライズを実現する設計を考えました。

呼び出し側は signalWithStart を呼ぶだけです。ワークフローが未起動なら起動してから Signal を送り、起動済みなら Signal だけを送信する処理がアトミックに行われるため、DBロックが不要になります。

以下は TypeScript による簡略化した実装例です。

export async function entityWorkflow() {
  const queue: Operation[] = [];

  // シグナルハンドラ: キューに追加するだけ
  setHandler(syncSignal, () => { // 同期リクエスト
    queue.push({ type: "sync" });
  });
  setHandler(deleteSignal, () => { // 削除リクエスト
    queue.push({ type: "delete" });
  });

  // メインループ: Signal が来るまで待機し、順次処理
  while (true) {
    await condition(() => queue.length > 0);
    const op = queue.shift()!;
    switch (op.type) {
      case "sync":
        await syncKnowledge(); // 同期処理
        break;
      case "delete":
        await deleteKnowledge(); // 削除処理
        return; // 削除後はワークフローを終了させる
    }
  }
}

長寿命ワークフローの運用課題

しかし、この設計にはいくつかの懸念がありました。Entity Workflow は一度起動すると長期間にわたって稼働し続けます。ワークフロー内部の処理やインターフェースを変更する場合、すでに稼働中のワークフローとの互換性を維持する必要があり、新旧のコードを共存させるバージョニングが求められます。

また、長寿命のステートフルワークフローには運用上の課題もあります。Temporal はワークフローの全実行履歴を Event History として記録しますが、Event History にはサイズ上限があるため、上限に達する前に continueAsNew でワークフローを新しい実行として再起動し、履歴をリセットしなければなりません。しかし再起動の際にはキューなどの状態を引き継ぐ必要があり、実装が複雑化します。

また、削除時にはキューに溜まった未処理の同期リクエストをすべて消化してからでないと削除処理に到達しないため、ユーザーが削除を要求しても、先にキューイングされた同期処理がすべて完了するまで待たされることになります。

設計案②:Workflow ID の命名規則による同時実行制御

「何を防ぐべきか」を整理する

Entity Workflow の課題を踏まえ、そもそもすべての操作を厳密にシリアライズする必要があるのか考え直しました。

同期 × 同期:重複しても問題ない

同期はソースの最新状態をそのまま反映する冪等な処理です。同じページの同期が2つ同時に走っても、最終的なデータは同じです。既に同期中であれば、後続のリクエストをスキップすれば十分です。

厳密には、同期の実行中にソース側が更新されると古い内容で上書きされる可能性があります。しかし、社内規程や業務マニュアルが短時間に何度も更新されることは稀ですし、仮に起きても日次バッチで最新状態に収束します。

同期 × 削除:データ復活はDB制約で防げるが、クリーンアップが必要

削除済みのナレッジに同期ワークフローがデータを書き戻そうとしても、DB制約で書き込みは失敗します。つまり、削除したナレッジが復活することはありません。 しかし、2つの問題が残ります。

  1. DB制約違反のエラーが発生すること自体が不健全
  2. ベクトルインデックスなどDB外には孤立したデータが作られてしまう

したがって、削除時には進行中の同期をキャンセルする仕組みが必要です。

整理すると、必要な制御は以下の2つに絞られます。すべての操作をキューでシリアライズする必要はありません。

  1. 同期・削除の二重実行を防ぐWorkflow ID の一意性制約 を活用し、WorkflowIdConflictPolicy.USE_EXISTING を指定することで、同じIDのワークフローが実行中なら新たに起動せず既存のワークフローを使う
  2. 削除時に進行中の同期を止めるVisibility API で実行中の関連ワークフローを検索し一括キャンセルすることで、孤立データやエラーを防ぐ

Workflow IDに階層パス構造を持たせる

これらを機能させるための鍵が、Workflow ID の命名規則の設計です。ナレッジはフォルダ (親) とページ (子) の階層構造を持っています。この親子関係を、ファイルシステムのパスのように Workflow ID に反映します。

操作 Workflow ID
フォルダ同期 folder/{folderId}/sync/{uniqueId}
ページ同期 folder/{folderId}/page/{pageId}/sync
ページ削除 folder/{folderId}/page/{pageId}/delete
フォルダ削除 folder/{folderId}/delete

ポイントは、すべてのIDが共通のプレフィックスを持つことです。folder/{folderId}/ でプレフィックス検索すれば、そのフォルダに関連するすべてのワークフローを一括取得できます。folder/{folderId}/page/{pageId}/ まで指定すれば特定ページに絞り込めます。

同期:重複排除と並列実行を Workflow ID の命名で両立する

フォルダ同期は配下の全ページを走査し、ページごとに同期ワークフローを起動します。ここでポイントになるのが、フォルダ同期とページ同期で Workflow ID の付け方を意図的に変えている点です。フォルダ同期は「毎回ユニークなID」で即座に開始できるようにし、ページ同期は「固定ID」で重複を自動排除する、という役割分担をしています。

ページ同期(固定ID → 重複排除)

ページ同期の Workflow ID は folder/{folderId}/page/{pageId}/sync で、pageId から一意に決まります。同じページへの同期リクエストが複数来ても Workflow ID が同一なので、Temporal が2つ目以降の起動を自動的にスキップします。つまり、1ページにつき同時に走る同期は常に1つだけです。

フォルダ同期(毎回ユニークID → 並列実行)

一方、フォルダ同期の Workflow ID は folder/{folderId}/sync/{uniqueId} で、リクエストごとに異なるIDが振られます。こうすることで、前回のフォルダ同期が完了していなくても新しい同期を開始できます。ソース側の更新を取りこぼさないために、先行する同期の完了を待たない設計にしています。

フォルダ同期が複数走っても安全な理由

フォルダ同期が並列で走ると、内部では同じページに対する同期リクエストが重複して発行されます。しかし、ページ同期の Workflow ID は固定なので、Temporal が重複を排除してくれます。さらに、各ページの同期処理はソース側に変更がなければ更新をスキップする実装にしているため、無駄な処理も発生しません。

削除:プレフィックス検索で関連ワークフローを一括キャンセルする

プレフィックスでキャンセル対象を特定する

削除ワークフローは、自身の Workflow ID から末尾の /delete を除いた部分をプレフィックスとして使います。Temporal の Visibility Query で STARTS_WITH 検索をかけると、そのプレフィックスに一致する実行中のワークフローを一覧取得できます。

Workflow ID がパス構造になっているため、ページ削除ならページまで、フォルダ削除ならフォルダまでをプレフィックスにすることで、キャンセル範囲が削除の粒度に応じて自然に決まります。

以下は TypeScript による簡略化した実装例です。

export async function deleteWorkflow() {
  const prefix = `folder/${folderId}/page/${pageId}/`;
  // 関連する同期ワークフローを一括キャンセル
  await cancelWorkflows(prefix);

  // キャンセル完了後に削除
  await deleteKnowledge();
}

export async function cancelWorkflows(prefix: string) {
  const temporal = await getTemporalClient();

  // Visibility Query でプレフィックスに一致するワークフローを検索
  // 厳密には自分自身が含まれないように除外する必要がある
  const workflows = temporal.workflow.list({
    query: `WorkflowId STARTS_WITH '${prefix}'`,
  });

  for await (const wf of workflows) {
    const handle = temporal.workflow.getHandle(wf.workflowId);
    await handle.cancel();
  }
}

Entity Workflow ではキューに溜まった未処理の同期をすべて消化するまで削除が待たされましたが、この方式では進行中の同期を即座にキャンセルして削除を開始できます。

キャンセル時のデータ整合性を保つ

同期ワークフローがキャンセルされると内部の処理にもキャンセルが伝播します。後処理は CancellationScope.nonCancellable() で保護することで、キャンセル時もデータが不整合な状態のまま残ることを防げます。以下は同期ワークフロー側の簡略化した実装例です。

export async function syncWorkflow() {
  try {
    await syncKnowledge(); // 同期処理
  } finally {
    // キャンセルされても後処理は確実に実行する
    await CancellationScope.nonCancellable(async () => {
      await cleanup();
    });
  }
}

まとめ

最終的に設計案②の「Workflow ID の命名規則による同時実行制御」を採用しました。
本記事で紹介した3つのアプローチを振り返ると、同時実行制御の責任をどこに置くかが段階的に変化しています。

アプローチ 制御の責任 懸念
DBロック 各エントリポイントが個別にロックを管理 実装漏れ・デッドロック
Entity Workflow & Signal キュー ワークフロー内のキューで一元管理 バージョニング・履歴上限
Workflow ID の命名規則 Temporal の ID 一意性制約に委譲 命名規則の遵守が前提・階層構造の変化に弱い

採用した方式のポイントは、開発者がやるべきことを「命名規則に従って Workflow ID を生成するだけ」に絞ったことです。たとえばページ同期ワークフローは、フォルダ同期からの Child Workflow として起動されることもあれば、管理画面から直接起動されることもあります。どちらの経路でも Workflow ID は同じ値になるため、同時にトリガーされても Temporal が自動的に重複を排除します。エントリポイント側にロックやガード処理を書く必要はなく、新しい経路を追加するときも、命名規則どおりの Workflow ID を渡してワークフローを起動するだけです。

ただし、この方式にもトレードオフはあります。まず、同時実行制御の正しさが命名規則の遵守に依存しています。規則から外れた ID でワークフローを起動すると、重複排除もプレフィックス検索によるキャンセルも機能しません。
また、この設計はナレッジがフォルダとページの2階層であることを前提としています。たとえばページの下にサブページが加わるような階層変更が起きた場合、ID体系とキャンセルのプレフィックス設計を見直す必要があります。階層構造を ID に埋め込んでいる以上、構造の変化には弱いという制約があります。

最後に

AIエージェントの開発は、LLMだけでなく同時実行制御やデータ整合性といった従来のソフトウェアエンジニアリングの課題と向き合いながら、信頼性の高いプロダクトを作り上げていく必要があります。

LayerXでは、こうしたエンジニアリングの課題に本気で取り組める仲間を募集しています。興味のある方はぜひご応募ください!

jobs.layerx.co.jp