Amazon S3 / Google Cloud Storage にある大量・巨大なファイルを COPY INTO で Snowflake へデータロードする際に押さえておくべきポイントを備忘録的に記しておきます。
前提となるデータロード
以下のようなユースケースにおけるCOPY INTO の利用を想定しています。
- Snowpipe で取り込むテーブルの過去データのロード
- 単一テーブルで replace (洗い替え) を行う巨大なテーブルのバルクロード
引用: https://docs.snowflake.com/ja/user-guide/data-load-s3
ロードするファイル形式は CSV が最も速く、次点で Parquet, ORC
こちらのコミュニティの記事が非常に参考になります。
CSV でのロードが最速ですが、データのファイル保管として JSON のような 半構造化データは有用かつ CSV 化は変換のコンピュートリソースもかかるため、変換するよりはそのまま JSON で取り込んだほうが楽です。 今後データレイクとしてファイルを保管する必要があるなら Parquet や対応が追加された Iceberg フォーマットを検討すると良いでしょう。
ロードするファイル数/サイズに応じて Warehouse のサイズを変更すると高速化が期待できる
Snowflake のデータロードはコンピュートリソースである Warehouse にて 1スレッド1ファイルで動きます。 またデータロード時のファイルサイズの推奨は圧縮サイズで 100〜250MB であり、『10MB 以下のファイルが大量にある場合は結合するように』とブログに紹介があります。 https://select.dev/posts/snowflake-batch-loading
Warehouse の XS サイズでは8スレッドで、S, M, ... とサイズが上がるごとにスレッド数が倍々になります。100GB の1ファイルをロードするとなると、XS サイズでも8スレッドのうち1スレッドしか使えず、コンピュートリソースも無駄になる上に時間もかかってしまいます。 https://select.dev/posts/snowflake-warehouse-sizing
100GB を 100MB * 1000ファイルに分割すると、XS の8スレッドがすべて使えるので8倍になります。さらに warehouse を128スレッドを持つ XL で動かせば1024倍速くロードできます。基本的にはファイル数に応じて無駄にならない Warehouse サイズを選択すると良いでしょう。
データロードではコンピュートリソースの従量課金
COPY INTO で動いた Warehouse 時間が課金されます。 例として 非圧縮4.7TB 相当の Parquet (Snappy comp/Structured) を、2-XL サイズの Warehouse で 3095sec かけてロードした場合(こちらのベンチマーク記事より拝借)を考えてみます。
ファイル数は十分に分割されており、2-XL Warehouse で動かすとすると
- 3095sec / 4.7TB * 32 credit(2XL)/h = 5.85 credit/TB
Enterprise + AWS (Tokyo) の単価では
- 5.85credit/TB * $4.3/credit = $25.1/TB
とそれなりにお金がかかることがわかります。過去データを一気にロードしてびっくりしないようにあらかじめ見積もることが重要です。
巨大なデータを COPY INTO 一発でロードすると良い感じにマイクロパーティショニングにならない
Snowflake が高速にクエリできる秘密は、マイクロパーティショニングという独自の機構を持っていることです。これはロード時にいくつかの列のまとまりを小さなファイルにまとめメタデータを内部的に保持する仕組みで、データ参照する際に不要なデータを読み飛ばす(プルーニング)ことで高速なクエリを実現します。これはマイクロバッチで逐次的にファイルを取り込む Snowpipe と相性が良く、大量のログデータを Snowpipe 等で逐次取り込みを行うと時系列的に近いデータが自動的に同じマイクロパーティションにまとまるため、クエリ効率が良くなります。
https://select.dev/posts/snowflake-micro-partitions
しかし、例えば過去1年分のデータを COPY INTO で一括ロードしてしまうと、1年という単位で処理され、処理されたファイル順にマイクロパーティションがランダムに生成されてしまうので、WHERE 句で特定の日付を抜き出すようなクエリを発行しても1年分のデータを参照してしまう可能性があります。このように、COPY INTOするテーブルで日付カラムでマイクロパーティショニングを構成する対処として3つのクラスタリング方法があります。
- Natural Clustering
- Auto Clustering
- Manual Sorting
Natural Clustering
COPY INTO を一発ではなく、クラスタリングしたいデータの範囲に小分けして COPY INTO する方法。 マイクロパーティショニングしたい単位で COPY INTO を実行します。
日付パスごとにファイルをロードする例
copy into log_table from @s3.log_archives.access_log/dt=20240601/ file_format = (type = 'json') match_by_column_name = case_insensitive; copy into log_table from @s3.log_archives.access_log/dt=20240602/ file_format = (type = 'json') match_by_column_name = case_insensitive; ...
Auto Clustering
テーブルにクラスタリングキーを指定することで、自動的にマイクロパーティショニングを再配置してくれる機能。テーブル作成時か作成後にクラスタリングキーを設定します。
alter table log_table cluster by (date_col);
https://docs.snowflake.com/ja/user-guide/tables-auto-reclustering
この方法はとても簡単ですが、自動クラスタリングのために裏側でコンピュートリソースが動くため、利用料金が高額になりやすいので注意が必要です。
Manual Sorting
取り込んだテーブルを CTAS で再作成する際に ORDER BY を指定する方法。ORDER BY に指定したカラムがマイクロパーティショニング構成時にヒント句としての役割を果たします。
以下の場合、date_col カラムでマイクロパーティショニングが構成されます。
create or replace table log_table as select * from log_table order by date_col;
マイクロパーティショニングが全く効かない状態でデータ取り込みしてしまった場合に簡単に修正可能なので、運用ではお世話になることになるでしょう。
VARIANT型の内部カラムではマイクロパーティショニングが効かない場合がある
VARIANT型のカラム内の値ではマイクロパーティショニングが効かない場合があるため、全データをスキャンすることがあります。 とりあえずログデータを VARIANT型 で取り込んで後で Snowflake 上で加工・構造化しようと思うと思うと痛い目にあいます。
NG なデータロード例
create or replace table log_table ( raw_data variant ); copy into log_table from @s3.log_archives.access_log/dt=20240601/ file_format = (type = 'json'); copy into log_table from @s3.log_archives.access_log/dt=20240602/ file_format = (type = 'json'); ... -- raw_data カラム内の timestamp カラムを WHERE 句に指定してもプルーニングが効かない select * from log_table where raw_data:timestamp::timestamp > to_timestamp('2024-06-01 00:00:00') and raw_data:timestamp::timestamp < to_timestamp('2024-06-01 01:00:00') ;
これを回避するには、VARIANT 型でロードするにしてもマイクロパーティショニングにしたいカラムはあらかじめ構造化しておくほうが良いでしょう。
ドキュメントにも記載があります。
For better pruning and less storage consumption, we recommend flattening your OBJECT and key data into separate relational columns if your semi-structured data includes:
- Dates and timestamps, especially non-ISO 8601 dates and timestamps, as string values
- Numbers within strings
- Arrays
またマイクロパーティショニングが効いてるかどうかを確認する関数も用意されていますが、VARIANT型には使えません。
select SYSTEM$CLUSTERING_INFORMATION('log_table', '(timestamp)');
https://docs.snowflake.com/ja/sql-reference/functions/system_clustering_information
他にも
これからも Snowflake でのデータ基盤開発・運用で Tips が出てきたら書き留めていきます。
参考文献
- https://docs.snowflake.com/ja/user-guide/data-load-s3
- https://select.dev/posts/snowflake-batch-loading
- https://select.dev/posts/snowflake-warehouse-sizing
- https://select.dev/posts/snowflake-micro-partitions
- https://docs.snowflake.com/ja/user-guide/tables-auto-reclustering
- https://docs.snowflake.com/en/user-guide/semistructured-considerations#storing-semi-structured-data-in-a-variant-column-vs-flattening-the-nested-structure
- https://docs.snowflake.com/ja/sql-reference/functions/system_clustering_information