こちらはLayerX AI Agentブログリレー37日目の記事です。前回の記事は@po3rinによる『AI Agentのビジネス価値を計るバックテスト基盤の構築』でした。
こんにちは。バクラク事業部 BizOps部 データグループの@civitaspoです。前回の『AI Agentのビジネス価値を計るバックテスト基盤の構築』で以下の記載がありました。
LayerXではデータ基盤としてSnowflakeを採用しています。SnapshotデータにアクセスするためにはアプリケーションDBにアクセスしていたクエリを少し変えるだけです。例えば、2025/10/21 9時時点のSnapshotデータにアクセスする場合は次のようになります。
# 通常クエリ SELECT * FROM SAMPLE_TABLE; # Snapshotクエリ SELECT * FROM TABLE(SAMPLE_TABLE('2025-10-21 09:00:00.000 +0900'::timestamp_tz));テーブル名と同じ関数定義をしており、内部でSnapshotデータへのアクセスをしています。その結果をTABLE関数で変換しています。
この内部の仕組みの詳細に関しては明日のAI Agentブログリレーで紹介します。
本記事では、このスナップショット取得の仕組みと内部実装、データパイプラインについて紹介します。
Snowflakeに実装したスナップショット取得機能の紹介
先ほどの引用に書かれていますが、改めてSnowflakeに実装したスナップショット取得機能を紹介します。
弊社では、アプリケーションが使用するMySQLデータベースのテーブルをSnowflakeへ連携して利用しています。今回の記事で紹介するスナップショット取得機能は、この連携されたテーブルに対して、過去の任意時点の状態を取得できる機能となります。たとえば、MySQL の sample_db の sample_table を Snowflake に連携したとします。すると、最新連携データは以下のクエリで取得できます。
-- 補足: 弊社では生データを src という Snowflake データベースに格納している。 select * from src.mysql_sample_db.sample_table;
この連携されたテーブルの過去時点のデータを取得したい場合、同名で定義されたTabular SQL UDF (以下、UDTF)を使って取得することが出来ます。
たとえば、 2025-10-15 08:54:11:123 +09:00時点のデータを参照したい場合、以下のクエリを書くことで取得することが出来ます。
select * from table( src.mysql_sample_db.sample_table('2025-10-15 08:54:11:123 +09:00'::timestamp_tz) );
この機能で特徴的なのは、ミリ秒単位で過去の任意時点に遡って取得が出来る点です。MySQLのbinlogへ書き込まれた時間を基準として遡ることが出来ます。
Snowflakeを利用している人からすると、Time Travel機能と思ってもらえると理解しやすいと思います。SnowflakeのTime Travel機能は、Snowflakeのテーブルに格納された時間を基準として最大過去90日間まで、過去の任意時点に遡れる機能です。
なお、誤解がないように補足しておきますが、本機能はSnowflakeが持つSnapshot機能とは関係ない、弊社独自に実装した機能です。Snowflakeが持つSnapshot機能は、Snapshot Policyに則って、特定時点のテーブルをPoint-in-time Snapshotとして残しておく機能です。
スナップショット取得機能を実現するデータソース
このスナップショット取得機能の実装詳細を説明する前に、使用しているデータソースについて説明しておきます。この機能の実現のために3つのデータソースを使用しています。
- Aurora Cluster Export で S3 へ出力した Parquet ファイル群を参照する External Table
- External Table から部分的に Snowflake へロードした Standard Table
- Debezium によって取得した Change Data Capture を格納した Standard Table
それぞれ説明していきます。
Aurora Cluster Export で S3 へ出力した Parquet ファイル群を参照する External Table
過去の記事で紹介した通り、弊社では毎日 Aurora Cluster Export を用いて、S3 へデータベースのエクスポートを行っています。
各Aurora ClusterごとにDailyで実行される Aurora Cluster Export 結果を、External TableのPartitionとして追加しています。External Table の定義は以下の通りで、Aurora Cluster 全体を単一のExternal Tableとして扱えるようにしています。
-- 以下のような URI でデータが格納される。 -- s3://${bucket}/${path_prefix}/${db_cluster_identifier}/d=${date}/${aurora_cluster_export_task_id}/${mysql_database}/${mysql_database}.${mysql_table}/${partition_num}/data-XXXXX.gz.parquet create or replace external table aurora_cluster_sample( _metadata_aurora_cluster_export_date date as (to_date(split_part(split_part(metadata$filename, '/', 3), '=', 2), 'yyyy-mm-dd')), _metadata_aurora_cluster_export_id varchar as (split_part(metadata$filename, '/', 4)), _metadata_aurora_cluster_export_db_cluster_identifier varchar as (split_part(metadata$filename, '/', 2)), _metadata_aurora_cluster_export_database varchar as (lower(split_part(metadata$filename, '/', 5))), _metadata_aurora_cluster_export_table varchar as (lower(split_part(split_part(metadata$filename, '/', 6), '.', 2))), _raw variant as ($1), _metadata_filename varchar as (metadata$filename), _metadata_file_row_number number as (metadata$file_row_number), _metadata_file_content_key varchar as (metadata$file_content_key), _metadata_file_last_modified timestamp_tz as (convert_timezone('UTC', cast(metadata$file_last_modified as timestamp_tz))), _metadata_start_scan_time timestamp_tz as (convert_timezone('UTC', cast(metadata$start_scan_time as timestamp_tz))) ) partition by ( _metadata_aurora_cluster_export_date, _metadata_aurora_cluster_export_db_cluster_identifier, _metadata_aurora_cluster_export_database, _metadata_aurora_cluster_export_table ) location=@aurora_cluster_sample/ pattern='aurora-cluster-export/aurora-cluster-sample/d=[0-9-]+/export-task-[a-z0-9-]+/[a-zA-Z0-9-_]+/[a-zA-Z0-9-_]+.[a-zA-Z0-9-_]+/[0-9]+/[a-z0-9-]+.gz.parquet$' file_format=sample_db.sample_schema.parquet_file_format aws_sns_topic='arn:aws:sns:ap-northeast-1:123456789012:sample-snowpipe-sns' ;
このテーブルにアクセスするときは partition カラムが必ず指定されるように View 定義を行っています。
また、テーブルのレコードの実データは全て _raw に格納されています。当然ながらこの状態では使い勝手が非常に悪いので、各テーブルごとに別経路で取得した MySQL の information_schema.columns の情報を元に、 _raw からデータを型キャストして取得するView定義を行っています。以下に例を載せておきます。
create or replace view aurora_cluster_sample.sample_db.sample_table as with landing_s3_external_table as ( select * from landing_s3_external_table.aurora_cluster_export.aurora_cluster_sample ), filtered_table as ( select _metadata_aurora_cluster_export_date, _raw, from landing_s3_external_table_all where _metadata_aurora_cluster_export_db_cluster_identifier = 'aurora_cluster_sample' and _metadata_aurora_cluster_export_database = 'sample_db' and _metadata_aurora_cluster_export_table = 'sample_table' -- 日付はView利用時にwhere句に指定する -- and _metadata_aurora_cluster_export_date = 'YYYY-MM-DD' ), extracted_table as ( select ft._metadata_aurora_cluster_export_date, ft._raw, to_varchar(get_ignore_case(_raw, 'id')) as id, to_varchar(get_ignore_case(_raw, 'tenant_id')) as tenant_id, to_varchar(get_ignore_case(_raw, 'status')) as status, to_varchar(get_ignore_case(_raw, 'user_id')) as user_id, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'created_at'))) as created_at, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'updated_at'))) as updated_at, from filtered_table as ft ), final as ( select * from extracted_table ) select * from final
上記の定義を、MySQLのinformation_schemaやAurora Cluster Exportのメタ情報から、dbt Project の定義ファイルを自動生成して更新し続けています。
External Table から部分的に Snowflake へロードした Standard Table
先ほど紹介した External Table はクエリ実行ごとに S3 へのアクセスが発生してしまうため非常に遅いです。そのため、直近数日分のデータや、長期間のスナップショットロードが定常的に必要なデータに関しては、Snowflake の Standard Table としてデータを格納しています。
このテーブルの作成方法は、先程説明したViewを dbt の incremental model で Standard Table に格納しているだけです。 docs.getdbt.com
(補足)Snowflake の Iceberg Table Support に ADD_FILES_COPY ではなく、Iceberg の Spark Procedure における add_files procedure のようなファイルコピーの発生しないデータ追加方法が出てきたら、データアクセス高速化のために一分のデータを Snowflake Standard Table への再格納するのではなく、すべてのデータを Iceberg Table として管理するだけで済むのになぁ…と思うばかりです。
Debezium によって取得した Change Data Capture を格納した Standard Table
Daily実行されるAurora Cluster Exportのデータだけでは任意時点のスナップショットを再現することはできません。空白期間のデータを埋めるため Change Data Capture を使用しています。
Change Data Capture (以下、CDC)とは、データベースに発生した変更を変更セットごとに記録する仕組みを指します。CDC を取得する方法はいくつか存在するのですが、弊社では Debezium を利用しています。DebeziumはオープンソースのCDCプラットフォームで、MySQLに限らず、多くの種類のデータベースからCDCを取得することが出来ます。 debezium.io
最終的に、以下の図のようにDebeziumで取得したデータを、Managed Streaming for Apache Kafkaを経由し、SnowflakeへSnowpipe Streamingで格納しています。このパイプラインの設定や運用などの詳細は、本記事の主眼から外れるので、概要レベルで説明しておきます。1

DebeziumはECS Fargate上に構築したKafka ConnectのSource Connectorとしてデプロイしています。Kafka Connectは、Kafkaと外部システム間で信頼性を担保しつつ、スケーラブルにデータ連携するためのフレームワークです。2 Apache Kafka の持つ Exactly-Once Semantics な動作を容易に実装できたり、fault tolerant な実装が可能となる。
Debeziumは主にKafka Connect上にデプロイする方法と、スタンドアロンなサーバーとしてデプロイする方法がありますが、可用性の担保のしやすさや拡張性の高さを理由に、Kafka Connect上へデプロイする選択を採っています。 debezium.io
Apache Kafkaは、AWSのマネージドサービスであるManaged Streaming for Apache Kafka(以下、MSK)を利用して構築しています。 aws.amazon.com
MSKには、MSK Connectと呼ばれるKafka Connectのマネージドサービスも存在するのですが、デプロイの遅さや可観測性の低さ(JMX Metricsを取得できない、など)が運用上クリティカルだと判断し、利用を見送っています。
MSKにデータを格納したあとは、Kafka Connect上にSink ConnectorとしてデプロイしたSnowflake Kafka Connectorを使用して、Snowflakeにデータを格納しています。 docs.snowflake.com
Snowflake Kafka Connectorはドキュメント上はSnowpipe Streaming High Performance Architectureをサポートしていないように見えますが、実装を読むとサポートされています。 github.com
snowflake.streaming.v2.enabled というパラメータに true を設定することでSnowpipe Streaming High Performance Architectureを使用したデータ送信になります。
最終的に、MySQL上のテーブルごとにSnowflake上にもテーブルが作成されます。全てのテーブルは全て以下のテーブル定義となります。
create or replace table ${kafka_topic_name} ( record_metadata variant comment 'created by automatic table creation from snowflake kafka connector', record_content variant comment 'created by automatic table creation from snowflake kafka connector' );
record_metadata には、Snowflake Kafka Connectorがテーブル書き込み時のメタ情報を格納します。 record_content には debezium が Topic に格納したデータが格納されます。格納されるデータは、DebeziumのドキュメントにあるJSONと同じ物が入ってきます。 before や after にレコードの実データが格納され、その他はDebeziumが取得したメタデータです。
{
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"name": "3.3.1.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308,
"ts_us": 1486501486308910,
"ts_ns": 1486501486308910814
}
}
このままだと使いにくいので、少しでも使いやすくする目的で、弊社では Snowflake Kafka Connector に snowflake.enable.schematization=true を定義し、Schema Evolution を有効にしています。その場合、以下のようなカラム定義となります。
create or replace table ${kafka_topic_name} ( record_metadata variant comment 'created by automatic table creation from snowflake kafka connector', schema variant comment 'column created by schema evolution from snowflake kafka connector', payload variant comment 'column created by schema evolution from snowflake kafka connector' );
ほとんど変わりませんが、 record_content が schema と payload に分解されます。先ほど説明したJSONの最上位階層がカラムになっています。
スナップショット取得のためのUDTF実装
ここまで紹介したデータソースを使って、冒頭のスナップショット取得機能を実現するUDTF実装の詳細を説明していきます。いきなりSQLの実装を載せられても理解が難しいと思うので、まずは先に処理フローを書いていこうと思います。
スナップショット取得UDTFの処理フロー
スナップショット取得機能を実現するためには以下の処理フローでデータを処理します。
- ユーザーからsnapshot_from、snapshot_to、intervalの3種類の変数を受け取る。
- snapshot_from 時点から snapshot_to 時点までの期間を interval ごとに指した時点のスナップショット群が取得できるインターフェースにしている
- AI Agentのバックテスト基盤用途だけではなく、たとえば「直近1年の月末時点でのアクティブユーザー数』といった集計のために複数のスナップショット群を取得できるようにしている。
- ユーザーから受け取った変数を基に対象となるスナップショットの時点を生成する。
- 生成したスナップショット時点に、それぞれ一番近い過去のAurora Cluster Exportのデータを取得する。
- この時、Standard Tableで保持しているSnowflake上のAurora Cluster Export のデータがあれば、そちらを優先的に使用する。
- 取得したAurora Cluster Exportが実際にExportを開始した時間から取得したいスナップショット時点までのCDCデータをそれぞれ取得する。
- 出力想定のスナップショット時点ごとにCDCデータを分割し、それぞれのタイムバケットでCDCのデータがプライマリーキーに対して最新のデータのみ取得されるような重複排除を行う。
- 前の過程で生成したCDCの最新データをinsert+updateとupdate+deleteの2種類のデータ集合に分解する。
- update+deleteのデータ集合をAurora Cluster Exportのデータに対してanti-joinする。
- Aurora Cluster ExportのデータとCDCのinsert+updateデータ集合をunionする。
上記の処理フローを図示すると以下のようになります。
flowchart TD
subgraph Inputs[入力パラメータ]
direction LR
A1[/snapshot_from/]
A2[/snapshot_to/]
A3[/interval/]
end
A1 & A2 & A3 --> B[1. スナップショット時点群<br>を生成]
subgraph S[Aurora Cluster Export の選択]
direction TB
B --> C[2. 各時点に最も近い過去の<br//>Aurora Cluster Exportを検索]
C --> D{Standard Table<br//>にデータがある?}
D -- Yes --> E[Standard Table を使用]
D -- No --> F[External Table を使用]
end
subgraph X[CDCデータの前処理]
direction TB
E --> G[3. Aurora Cluster Export取得開始時点からスナップショット時点までの CDC を取得]
F --> G
G --> H[4. CDC を時点ごとに<br//>分割 & 重複排除]
H --> I[5. CDC 最新レコードを<br//>2集合に分解]
I --> J[データ集合: insert + update]
I --> K[データ集合: update + delete]
end
%% ====== 差分の適用 ======
subgraph M[差分の適用]
direction TB
K --> L[6. update + delete を<br//>Aurora Cluster Exportに<br//> anti-join]
E --> L
F --> L
L --> U[Aurora Cluster Export<br> 残データ]
U --> V
J --> V[7. 残データと insert + update を union]
end
V --> N[スナップショットデータ出力]
%% ====== スタイル設定 ======
classDef input fill:#fff3cd,stroke:#d39e00,color:#333;
classDef decision fill:#fde2e2,stroke:#e3342f,color:#333;
classDef process fill:#e7f5ff,stroke:#1c7ed6,color:#333;
classDef output fill:#e9ffe6,stroke:#38a169,color:#333;
class A1,A2,A3 input
class D decision
class B,C,E,F,G,H,I,J,K,L,U,V process
class N output
以上の処理によりスナップショット取得の機能を実現しています。
スナップショット取得UDTFの処理フローをSQLで実装する
ここまで書いた処理を実装したものが以下のSQLになります。非常に長く難解なので、参考になると思う人だけ読んでください。理解を促すためのコメントは残しておきます。
-- SQLのわかりやすさを重視して、CREATE FUNCTION の中身のSQLのみ記載している。 -- 以下の3つの変数を入力値として受け取るUDTFの中身だと思って読んでください。 -- -- snapshot_timestamp_from: timestamp_tz -- snapshot_timestamp_to: timestamp_tz -- interval: varchar with recursive -- 出力対象のスナップショット時点群を生成するCTE -- Snowflakeにはinterval型がないため、文字列で受け取ったinterval情報から -- 時間計算を行って出力対象のスナップショット時点群を生成している generate_timeseries as ( select snapshot_timestamp_from as ts union all select reduce( transform( transform(split(interval, ','), v -> trim(v)), v -> iff( regexp_like( lower(v), '^([0-9]+(\\.[0-9]+)?|the\\s+last\\s+day\\s+of)\\s?+[a-z]+$' ), object_construct( 'amount', regexp_substr( lower(v), '^([0-9]+(\\.[0-9]+)?|the\\s+last\\s+day\\s+of)' ), 'unit', regexp_substr(lower(v), '[A-Za-z]+$') ), object_construct() ) ), ts, (acc, v) -> ( case when regexp_like(v['amount'], '^the\\s+last\\s+day\\s+of') then case when v['unit'] in ( 'y', 'yy', 'yyy', 'yyyy', 'yr', 'year', 'years', 'yrs' ) then iff( last_day(ts, 'year') != last_day(acc, 'year'), last_day(acc, 'year')::timestamp_tz, last_day( last_day(acc, 'year')::timestamp_tz + interval '1 day', 'year' ) )::timestamp_tz when v['unit'] in ('q', 'qtr', 'qtrs', 'quarter', 'quarters') then iff( last_day(ts, 'quarter') != last_day(acc, 'quarter'), last_day(acc, 'quarter')::timestamp_tz, last_day( last_day(acc, 'quarter')::timestamp_tz + interval '1 day', 'quarter' ) )::timestamp_tz when v['unit'] in ('month', 'mm', 'mon', 'mons', 'months') then iff( last_day(ts, 'month') != last_day(acc, 'month'), last_day(acc, 'month')::timestamp_tz, last_day( last_day(acc, 'month')::timestamp_tz + interval '1 day', 'month' ) )::timestamp_tz when v['unit'] in ( 'w', 'wk', 'week', 'weekofyear', 'woy', 'wy', 'weeks' ) then iff( last_day(ts, 'week') != last_day(acc, 'week'), last_day(acc, 'week')::timestamp_tz, last_day( last_day(acc, 'week')::timestamp_tz + interval '1 day', 'week' ) )::timestamp_tz else -- unsupported parse_json( concat( '[ERROR] last_day(<amount>, ''', v['unit'], ''') is an unsupported operation. ', '(Please ignore the message "Error parsing JSON: unknown keyword", ', 'as it appears because we intentionally trigger an error using parse_json, and it is not the root cause.)' ) ) end when v['unit'] in ('y', 'yy', 'yyy', 'yyyy', 'yr', 'year', 'years', 'yrs') then timestampadd('years', v['amount']::number, acc) when v['unit'] in ('q', 'qtr', 'qtrs', 'quarter', 'quarters') then timestampadd('quarters', v['amount']::number, acc) when v['unit'] in ('month', 'mm', 'mon', 'mons', 'months') then timestampadd('months', v['amount']::number, acc) when v['unit'] in ('w', 'wk', 'week', 'weekofyear', 'woy', 'wy', 'weeks') then timestampadd('weeks', v['amount']::number, acc) when v['unit'] in ('d', 'dd', 'day', 'days', 'dayofmonth') then timestampadd('days', v['amount']::number, acc) when v['unit'] in ('h', 'hh', 'hr', 'hrs', 'hour', 'hours') then timestampadd('hours', v['amount']::number, acc) when v['unit'] in ('m', 'mi', 'min', 'minutes', 'mins') then timestampadd('minutes', v['amount']::number, acc) when v['unit'] in ('s', 'sec', 'secs', 'second', 'seconds') then timestampadd('seconds', v['amount']::number, acc) when v['unit'] in ('ms', 'msec', 'milliseconds') then timestampadd('milliseconds', v['amount']::number, acc) when v['unit'] in ('us', 'usec', 'microseconds') then timestampadd('microseconds', v['amount']::number, acc) when v['unit'] in ( 'ns', 'nsec', 'nanosec', 'nsecond', 'nanoseconds', 'nanosecs', 'nseconds' ) then timestampadd('nanoseconds', v['amount']::number, acc) else -- unsupported parse_json( concat( '[ERROR] Unsupported <unit>: "', v['unit'], '". ', '(Please ignore the message "Error parsing JSON: unknown keyword", ', 'as it appears because we intentionally trigger an error using parse_json, and it is not the root cause.)' ) ) end ) ) as ts_next from generate_timeseries where ts_next <= ifnull(snapshot_timestamp_to, snapshot_timestamp_from) ), -- まず、出力対象のスナップショット時点群や Aurora Cluster Export のメタ情報など -- データ抽出に必要なデータセットを作る。 timestamps as ( select ts as snapshot_timestamp from generate_timeseries where ts <= ifnull(snapshot_timestamp_to, snapshot_timestamp_from) ), export_time_path_mappings_for_the_table as ( -- Aurora Cluster Exportのメタ情報が格納されているテーブル -- 出力先のS3 Pathや、Exportの実行開始時間などが格納されている。 select * from landing_aurora_cluster_export.metadata.export_time_path_mappings where database_name = 'sample_db' and table_name = 'sample_table' and extraction_start_time <= to_timestamp_tz(snapshot_timestamp_from) and extraction_start_time >= to_timestamp_tz(ifnull(snapshot_timestamp_to, snapshot_timestamp_from)) - interval '7 day' ), export_time_path_mappings_candidates as ( -- 出力対象のスナップショット時点群のそれぞれに対して、 -- スナップショット時点に一番近い過去の Aurora Cluster Export の出力情報を出力するため -- asof join で Aurora Cluster Exportのメタ情報と結合し、データ取得している。 select * from timestamps asof join export_time_path_mappings_for_the_table match_condition( timestamps.snapshot_timestamp >= export_time_path_mappings_for_the_table.extraction_start_time) order by partition_date ), partition_date_extraction_start_time_snapshot_timestamp_mappings as ( -- 出力対象のスナップショット時点群のそれぞれに対して、紐づく Aurora Cluster Export の出力情報から -- 異常値をフィルターしたもの。 -- partition_date は Aurora Cluster Export のデータを取り扱うときのデータ集合を管理するグルーピングキーとして使用される。 -- snapshot_timestamp は出力対象のスナップショット時点であり、出力対象のデータ集合を管理するグルーピングキーとして使用される。 -- extraction_start_date は Aurora Cluster Export の実行開始時点であるため、出力対象のスナップショット時点と比較し、CDCデータを取得するフィルタリングキーとして使用される。 select distinct partition_date, extraction_start_time, snapshot_timestamp from export_time_path_mappings_candidates where snapshot_timestamp <= current_timestamp() and partition_date is not null ), -- ここから実データの取得を開始する。 -- 最初は、Standard Tableに保存されたAurora Cluster Exportのデータ、つまり、 -- 「External Table から部分的に Snowflake へロードした Standard Table」のデータを取得する materialized_snap_table_all as ( -- 「External Table から部分的に Snowflake へロードした Standard Table」が -- landing_aurora_cluster_export.materialized_snap_... で始まるテーブルに格納されている。 select * from landing_aurora_cluster_export.materialized_snap_mysql_sample_db.sample_table ), materialized_snap_table_filtered as ( -- 出力対象のスナップショット時点に紐づく、対象のpartition_dateを含むデータが存在していれば抽出する select * from materialized_snap_table_all where _metadata_aurora_cluster_export_date in ( select partition_date from partition_date_extraction_start_time_snapshot_timestamp_mappings ) ), materialized_snap_aurora_cluster_exports as ( -- Aurora Cluster Export のメタ情報と結合し、後の工程でCDCデータと結合する準備をする。 select m.snapshot_timestamp, dat.id, dat.tenant_id, dat.status, dat.user_id, dat.created_at, dat.updated_at, dat._raw, dat._metadata_data_captured_at, dat._metadata_source_loader_type, from materialized_snap_table_filtered as dat left join partition_date_extraction_start_time_snapshot_timestamp_mappings as m on dat._metadata_aurora_cluster_export_date = m.partition_date ), -- 「External Table から部分的に Snowflake へロードした Standard Table」には存在しない、 -- 「Aurora Cluster Export で S3 へ出力した Parquet ファイル群を参照する External Table」のデータを取得する materialized_snap_table_clustering_keys as ( -- materialized_snap_ に存在していないパーティションのみを対象とするため、 -- materialized_snap_ に含まれるパーティションを事前抽出しておく select distinct _metadata_aurora_cluster_export_date from landing_aurora_cluster_export.materialized_snap_mysql_sample_db.sample_table ), partition_date_extraction_start_time_snapshot_timestamp_mappings_not_in_materialized_snap as ( -- External Tableが保有するパーティションのうち、出力対象であるパーティションから -- materialized_snap_ に含まれていたパーティションを除く select * from partition_date_extraction_start_time_snapshot_timestamp_mappings where partition_date not in ( select _metadata_aurora_cluster_export_date from materialized_snap_table_clustering_keys ) ), external_snap_table_all as ( -- External Tableを参照 select * from landing_s3_external_table.aurora_cluster_export.aurora_cluster_sample ), external_snap_table_filtered as ( -- 対象のパーティションのみをExternal Tableから取得 select _metadata_aurora_cluster_export_date, _raw, 'aurora_cluster_export' as _metadata_source_loader_type, from external_snap_table_all where _metadata_aurora_cluster_export_database = 'sample_db' and _metadata_aurora_cluster_export_table = 'sample_table' and _metadata_aurora_cluster_export_date in ( select partition_date from partition_date_extraction_start_time_snapshot_timestamp_mappings_not_in_materialized_snap ) ), external_snap_table_columns_extracted as ( select _metadata_aurora_cluster_export_date, to_varchar(get_ignore_case(_raw, 'id')) as id, to_varchar(get_ignore_case(_raw, 'tenant_id')) as tenant_id, to_varchar(get_ignore_case(_raw, 'status')) as status, to_varchar(get_ignore_case(_raw, 'user_id')) as user_id, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'created_at'))) as created_at, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'updated_at'))) as updated_at, _raw, _metadata_source_loader_type, from external_snap_table_filtered ), external_snap_aurora_cluster_exports as ( -- Aurora Cluster Export のメタ情報と結合し、後の工程でCDCデータと結合する準備をする。 select m.snapshot_timestamp, dat.id, dat.tenant_id, dat.status, dat.user_id, dat.created_at, dat.updated_at, dat._raw, m.extraction_start_time as _metadata_data_captured_at, dat._metadata_source_loader_type, from external_snap_table_columns_extracted as dat left join partition_date_extraction_start_time_snapshot_timestamp_mappings as m on dat._metadata_aurora_cluster_export_date = m.partition_date ), -- 結合対象のCDCデータを取得していく cdc_all as ( select * from landing_kafka_connector.debezium.debezium__mysql__aurora_cluster_sample__sample_db__sample_table ), cdc_extracted_and_rough_filtered as ( -- 出力対象となりうるCDCデータを、ユーザー入力値を元に荒くフィルタリング select coalesce(payload:after, payload:before) as _raw, payload:source:ts_ns::varchar::timestamp_tz as _metadata_data_captured_at, payload:op::varchar as _cdc_operation, payload:source:file::varchar as _mysql_binlog_file, payload:source:pos::int as _mysql_binlog_pos, from cdc_all where payload is not null -- exclude tombstone records and _metadata_data_captured_at >= to_timestamp_tz('2025-10-19T21:00:00+09:00') -- cdc started time and _metadata_data_captured_at between (select min(extraction_start_time) from partition_date_extraction_start_time_snapshot_timestamp_mappings) and (select max(snapshot_timestamp) from partition_date_extraction_start_time_snapshot_timestamp_mappings) ), cdc_filtered as ( -- 事前計算した出力対象のスナップショット時点と Aurora Cluster Export の実行開始時間の間に存在するCDCデータを -- 出力対象のスナップショット時点群のそれぞれに対して抽出 select m.snapshot_timestamp, dat._raw, dat._metadata_data_captured_at, dat._cdc_operation, dat._mysql_binlog_file, dat._mysql_binlog_pos, 'cdc' as _metadata_source_loader_type, from partition_date_extraction_start_time_snapshot_timestamp_mappings as m left join cdc_extracted_and_rough_filtered as dat on dat._metadata_data_captured_at between m.extraction_start_time and m.snapshot_timestamp ), cdc_latest_records as ( -- 出力対象のスナップショット時点ごとにプライマリーキーに対して最新のレコードのみを抽出 select snapshot_timestamp, to_varchar(get_ignore_case(_raw, 'id')) as id, _raw, _metadata_data_captured_at, _cdc_operation, _metadata_source_loader_type, from cdc_filtered qualify 1 = row_number() over ( partition by snapshot_timestamp, id order by _metadata_data_captured_at desc nulls last, _mysql_binlog_file desc nulls last, _mysql_binlog_pos desc nulls last ) ), -- https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-change-event-values cdc_latest_records_inserted_or_updated as ( -- 新規挿入と更新レコードを最後にUNIONするため抽出 select snapshot_timestamp, id, to_varchar(get_ignore_case(_raw, 'tenant_id')) as tenant_id, to_varchar(get_ignore_case(_raw, 'status')) as status, to_varchar(get_ignore_case(_raw, 'user_id')) as user_id, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'created_at'))) as created_at, to_timestamp_tz(to_varchar(get_ignore_case(_raw, 'updated_at'))) as updated_at, _raw, _metadata_data_captured_at, _metadata_source_loader_type, from cdc_latest_records where _cdc_operation in ('c', 'u') ), cdc_latest_records_deleted_or_updated as ( -- 最終成果物を作るときにCDCにおける更新と削除のレコードを -- Aurora Cluster Export のデータから事前に消しておく必要があるため -- 対象となるレコードを抽出 select snapshot_timestamp, job_id, from cdc_latest_records where _cdc_operation in ('d', 'u') ), materialized_snap_aurora_cluster_exports_anti_join_cdc_latest_records_deleted_or_updated as ( -- Aurora Cluster Export のデータからCDCにおける更新と削除のデータ集合を取り除く -- Anti-Join という手法で取り除いている。 select * from materialized_snap_aurora_cluster_exports where not exists ( select 1 from cdc_latest_records_deleted_or_updated where materialized_snap_aurora_cluster_exports.snapshot_timestamp = cdc_latest_records_deleted_or_updated.snapshot_timestamp and materialized_snap_aurora_cluster_exports.job_id = cdc_latest_records_deleted_or_updated.job_id ) ), external_snap_aurora_cluster_exports_anti_join_cdc_latest_records_deleted_or_updated as ( -- Aurora Cluster Export のデータからCDCにおける更新と削除のデータ集合を取り除く -- Anti-Join という手法で取り除いている。 select * from external_snap_aurora_cluster_exports where not exists ( select 1 from cdc_latest_records_deleted_or_updated where external_snap_aurora_cluster_exports.snapshot_timestamp = cdc_latest_records_deleted_or_updated.snapshot_timestamp and external_snap_aurora_cluster_exports.job_id = cdc_latest_records_deleted_or_updated.job_id ) ), final as ( -- Aurora Cluster Export のデータと -- CDCにおけるプライマリーキーに対する最新レコードのうち、 -- 新規と更新のレコードをUNIONする select * from materialized_snap_aurora_cluster_exports_anti_join_cdc_latest_records_deleted_or_updated union all by name select * from external_snap_aurora_cluster_exports_anti_join_cdc_latest_records_deleted_or_updated union all by name select * from cdc_latest_records_inserted_or_updated ) select * from final
全てを説明するには紙面と気力が足りないので、詳細な解説が欲しい人はカジュアル面談に応募してください。3 jobs.layerx.co.jp
おわりに
この記事では、Snowflake上でのスナップショット取得を実現するデータパイプラインの構築について紹介しました。MySQLデータベースのテーブルをSnowflakeに連携し、過去の任意時点のデータを取得できる仕組みを実装することで、AI Agentのビジネス価値を計るバックテスト基盤の構築を支えています。
Aurora Cluster ExportとChange Data Capture(CDC)を組み合わせることで、ミリ秒単位で過去に遡ることが可能なスナップショット取得を実現しました。実装の中核となるのは、UDTFを活用したスナップショット取得機能です。時系列データの生成、Aurora Cluster Exportデータとの結合、CDCデータによる差分適用といった一連の処理を、Snowflake上でSQLで実行できるようになっています。
このようなデータパイプラインは、AI Agentのバックテストだけでなく、データドリブンな意思決定や監査対応、時系列分析など、様々なユースケースにも応用可能です。過去の任意時点のデータにアクセスできる基盤は、Production-ReadyなAI開発を支える重要な要素となるでしょう。
LayerXでは、Snowflakeを活用したデータ基盤の構築と、その上でのAI/MLシステムの開発を進めています。Production-ReadyなAI開発をサポートするためのデータ基盤開発、時系列データ処理、リアルタイムデータパイプラインの構築などに興味がある方は、ぜひ一緒にチャレンジしましょう!
最後に改めて、この記事はLayerX AI Agentブログリレーの37日目の記事です。毎日AI Agentに関する知見をお届けします!!LayerXテック公式Xを是非フォローして見逃さないようにお願いします!
open.talentio.com open.talentio.com open.talentio.com
- 別記事として後日ブログにします。↩
- Kafka Connect上で動作するConnectorはDebezium以外にも多数存在しています。 Kafka Connectors | Confluent Documentation↩
- コードブロック内でINTERVAL型がないと表現していますが、このブログをリリースした直後に INTERVAL 型が Public Preview になりました。 9.35 Release Notes: Preview | Snowflake Documentation↩