こんにちは。バクラク事業部 機械学習・データ部 データグループの@civitaspoです。もうすぐクリスマスですね。最近、何度か娘に「サンタさんへお願いしたもの」をヒアリングしているのですが、私には秘密みたいで、サンタさんから委託された業務を全うできないかもしれず、ヒヤヒヤする日々を過ごしています。
さて、本記事では、Amazon AuroraのCluster Export機能を用いてAmazon S3へ出力したデータを、dbtを使用してSnowflakeへimportする方法を書きます。以前、『BigQueryからSnowflakeへ移管して作る最強のデータ基盤 〜Data Ingestion編〜』の登壇にて、データベースからSnowflakeへのData Ingestionについて紹介しました。本記事は、その詳細な内容となります。
はじめにAurora Cluster Exportの説明をした後、SnowflakeでAmazon S3上のデータを読む方法の説明を行い、最後にdbtを使ってAurora Cluster Exportで出力したデータをSnowflakeへImportする方法を書きます。
Aurora Cluster Exportとは
Amazon Aurora(以下、Aurora)には、Aurora内に保有するデータをAmazon S3(以下、S3)にExportする機能を持っています。正式名称は「Exporting DB cluster data to Amazon S3」ですが、弊社内ではAurora Cluster Exportと呼称しています。
Exportされるデータは、テーブルごとにPrefixが分かれた状態で、Parquetファイルとして出力されます。ドキュメントにも書かれているとおり、このExport処理はAuroraのパフォーマンスに影響を与えないため、安全にデータを取得することができます。
You can export data from a live Amazon Aurora DB cluster to an Amazon S3 bucket. The export process runs in the background and doesn't affect the performance of your active DB cluster.
Aurora Cluster Exportの実行
以下のようなコマンド実行によって、Export処理の実行を行うことができます。
aws rds start-export-task \ --iam-role-arn 'arn:aws:iam::123456789012:role/test-civitaspo' \ --export-task-identifier my-cluster-export \ --s3-bucket-name test-civitaspo \ --s3-prefix my-aurora-cluster-20241221 \ --source-arn 'arn:aws:rds:ap-northeast-1:123456789012:cluster:my-aurora-cluster' \ --kms-key-id 'arn:aws:kms:ap-northeast-1:123456789012:key/b007b100-2aad-4f04-9d20-fb8f87fc133b'
各オプションについて少し詳しく説明します。
--iam-role-arn
: このオプションに指定するIAM RoleはExport処理を実行できる権限を保つ必要があります。具体的には、コードを載せたほうが理解が早いと思うので社内で利用している Aurora Cluster Export 用に IAM の Policy Document を生成する terraform module のコード片を記載しておきます。# variables.tf variable "db_cluster_identifiers" { type = list(string) default = null description = "The DB Cluster identifiers" } variable "destination_s3_bucket_arns" { type = list(string) description = "The S3 bucket ARNs" } variable "allow_delete_objects" { type = bool default = false description = "Whether to allow deleting objects" } variable "executor_role_arn" { type = string description = "The ARN of the executor role" } # output.tf output "json" { value = data.aws_iam_policy_document.this.json } # main.tf data "aws_caller_identity" "current" {} data "aws_region" "current" {} data "aws_iam_policy_document" "this" { statement { # NOTE: We need to check the status of all export tasks on the account to avoid the following error. # > An error occurred (ExportTaskLimitReachedFault) when calling the StartExportTask # > operation: You have reached the limit of 5 concurrent export tasks. # ref. https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/export-cluster-data.Considerations.html # > You can have up to five concurrent DB snapshot export tasks in progress per AWS account. effect = "Allow" actions = ["rds:DescribeExportTasks"] resources = ["*"] } statement { effect = "Allow" actions = [ "rds:StartExportTask", ] resources = concat( [ for id in var.db_cluster_identifiers : "arn:aws:rds:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:cluster:${id}" ], [ # NOTE: Cluster snapshots have no naming convention; for example, they may include a UUID. # So we need to allow all cluster snapshots. # e.g. arn:aws:rds:ap-northeast-1:123456789012:cluster-snapshot:awsbackup:job-3becc6de-e092-424f-80bb-e236e5cb956e "arn:aws:rds:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:cluster-snapshot:*:*", ], ) } statement { effect = "Allow" actions = ["iam:PassRole"] resources = [var.executor_role_arn] condition { test = "StringEquals" variable = "iam:PassedToService" # NOTE: not "export.rds.amazonaws.com" values = ["rds.amazonaws.com"] } } statement { # We customize the actions from the original policy document to reduce unintentional access. # ref. https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/export-cluster-data.Setup.html effect = "Allow" actions = [ "s3:PutObject", "s3:PutObjectTagging", "s3:GetObject", "s3:GetObjectTagging", "s3:GetObjectAttributes", "s3:ListBucket", "s3:GetBucketLocation", # NOTE: We don't want to allow deleting objects, but rds:StartExportTask needs this permission. # If we don't allow this, errors like the following occur. # > The IAM role arn:aws:iam::123456789012:role/aurora-cluster-exporter isn't authorized to call s3:DeleteObject on the S3 bucket snowflake-aurora-cluster-export-integration-bucket. (Service: Rds, Status Code: 400, Request ID: e21aca3c-1910-4da9-a5ba-3d0c17d03b57) "s3:DeleteObject", ] resources = concat( var.destination_s3_bucket_arns, [for arn in var.destination_s3_bucket_arns : "${arn}/*"], ) } }
--export-task-identifier
: このオプションは、Export処理のIDになると同時に、格納先であるS3のPrefixにもなります。そのため、データ取得側のロジックも考慮して命名規則を決めるべしです。なお、このIDは最大60文字までで、AWSアカウント内でユニークである必要がある点にも注意が必要です。--s3-bucket-name
: Export先のS3バケット名を指定します。--s3-prefix
: Export先のS3のprefixを指定します。Prefixとして使用する時にシステム側で末尾のスラッシュが付与されるため、このオプションの末尾にはスラッシュを書いてはいけません。書いてしまうと2重スラッシュのPrefixが定義されます。--source-arn
: Aurora Cluster の ARN または Aurora Cluster Snapshot の ARN を指定します。--kms-key-id
: 暗号化に使用するKMS Key の ARN を指定します。--export-only
: 必須ではないオプションです。出力対象のデータベースやテーブルを限定する場合に使用します。
Aurora Cluster Exportの同時実行数は5つ
このAurora Cluster ExportのExport処理はアカウント内で同時に5つまでしか実行することができないことに注意すべきです。
• You can have up to five concurrent DB snapshot export tasks in progress per AWS account.
以下のコマンドで実行中のExport処理一覧を取得できるので、他のExport処理が5つ起動している場合は、実行開始を待機するような制御を行う必要があります。
aws rds describe-export-tasks \ --filters Name=status,Values=starting,in_progress,canceling,
Aurora Cluster Exportの結果
Aurora Cluster Exportによって出力された結果は、最終的に s3://${--s3-bucket-name}/${--s3-prefix}/${--export-task-identifier}/${database}/${database or schema}.${table}/${partition}/part-${uuid}.gz.parquet
というPath構成でデータが出力されます。出力されるParquetファイルはGzip圧縮された状態で2~5MB程度のサイズになるように分割されて出力されます。
この実データの他に、2つのメタデータファイルが出力されます。1つは s3://${--s3-bucket-name}/${--s3-prefix}/${--export-task-identifier}/export_tables_info_${--export-task-identifier}.json
という名称のファイルです。このファイルには、以下のようなテーブルごとのスキーマ情報などが格納されています。
{ "perTableStatus": [ { "tableStatistics": { "extractionStartTime": "Dec 21, 2024, 12:03:27 PM", "extractionEndTime": "Dec 21, 2024, 12:03:51 PM", "partitioningInfo": { "numberOfPartitions": 1, "numberOfCompletedPartitions": 1 } }, "schemaMetadata": { "originalTypeMappings": [ { "columnName": "id", "originalType": "varchar", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 36, "originalNumPrecision": 0, "originalCharsetName": "utf8mb4", "originalDateTimePrecision": 0, "ordinalPosition": 1 }, { "columnName": "tenant_id", "originalType": "varchar", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 36, "originalNumPrecision": 0, "originalCharsetName": "utf8mb4", "originalDateTimePrecision": 0, "ordinalPosition": 2 }, { "columnName": "task_name", "originalType": "varchar", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 191, "originalNumPrecision": 0, "originalCharsetName": "utf8mb4", "originalDateTimePrecision": 0, "ordinalPosition": 3 }, { "columnName": "task_args", "originalType": "json", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 4 }, { "columnName": "task_result", "originalType": "json", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 5 }, { "columnName": "status", "originalType": "enum", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 10, "originalNumPrecision": 0, "originalCharsetName": "utf8mb4", "originalDateTimePrecision": 0, "ordinalPosition": 6 }, { "columnName": "error_log", "originalType": "varchar", "expectedExportedType": "binary (UTF8)", "originalCharMaxLength": 2048, "originalNumPrecision": 0, "originalCharsetName": "utf8mb4", "originalDateTimePrecision": 0, "ordinalPosition": 7 }, { "columnName": "failed_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 8 }, { "columnName": "deleted_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 9 }, { "columnName": "registered_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 10 }, { "columnName": "modified_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 11 }, { "columnName": "created_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 12 }, { "columnName": "updated_at", "originalType": "timestamp", "expectedExportedType": "int64 (TIMESTAMP_MICROS)", "originalCharMaxLength": 0, "originalNumPrecision": 0, "originalDateTimePrecision": 0, "ordinalPosition": 13 } ] }, "status": "COMPLETE", "sizeGB": 0.0000152587890625, "target": "mydatabase.mydatabase.mytasktable" }, .. ] }
もう一つのメタデータファイルには、Export処理事態のメタ情報が格納されています。 s3://${--s3-bucket-name}/${--s3-prefix}/${--export-task-identifier}/export_info_${--export-task-identifier}.json
という名称のファイルです。
{ "exportTaskIdentifier": "my-export", "sourceArn": "arn:aws:rds:ap-northeast-1:123456789012:cluster:my-aurora-cluster", "exportOnly": [], "snapshotTime": "Dec 21, 2024, 3:07:13 PM", "taskStartTime": "Dec 22, 2024, 11:51:01 AM", "taskEndTime": "Dec 22, 2024, 12:06:13 PM", "s3Bucket": "test-civitaspo", "s3Prefix": "my-aurora-cluster-20241221", "exportedFilesPath": "my-export", "iamRoleArn": "arn:aws:iam::123456789012:role/test-civitaspo", "kmsKeyId": "arn:aws:kms:ap-northeast-1:123456789012:key/b007b100-2aad-4f04-9d20-fb8f87fc133b", "status": "COMPLETE", "percentProgress": 100, "totalExportedDataInGB": 1.689056396484375 }
Aurora Cluster ExportをAWS Step Functionsで実行する
弊社ではAurora Cluster ExportをAWS Step Functionsで実行可能にしています。以下の図がAWS Step FunctionsのState Machineの図です。
ここまでの説明で分かる通り、Aurora Cluster Exportには同時実行数制御など、いくつか注意して処理実行する必要がありました。このState Machineにはそれら注意点を上手く処理できるように、待機処理などのStepを定義しています。本記事では、このState Machineの処理詳細については、記事内容の本筋から逸れるため詳細には説明しません。しかし、読者の方も簡単に Aurora Cluster Export を実行する State Machine を組んで詳細理解ができるように、このState Machineを定義するJSONを、以下に記載しておきます。 Input
というStepのパラメータをご自身の環境の値に設定すれば動くように作ってあるので、是非お試しください。
{ "StartAt": "Input", "States": { "Input": { "Type": "Pass", "Next": "Constants", "Parameters": { "IamRoleArn": "arn:aws:iam::123456789012:role/aurora-cluster-exporter", "KmsKeyId": "arn:aws:kms:ap-northeast-1:123456789012:key/b007b100-2aad-4f04-9d20-fb8f87fc133b", "S3BucketName": "snowflake-aurora-cluster-export-integration-bucket", "DBClusterIdentifier": "my-aurora-cluster", "SourceArn": "arn:aws:rds:ap-northeast-1:123456789012:cluster:my-aurora-cluster" } }, "Constants": { "Type": "Pass", "Next": "Variables", "Parameters": { "ExecutionStartTime.$": "$$.Execution.StartTime", "ExecutionStartDate.$": "States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0)", "ExecutionUuid.$": "States.UUID()", "S3RdsClusterExportPrefix": "aurora-cluster-export" }, "ResultPath": "$.Constants" }, "Variables": { "Type": "Pass", "Next": "DescribeRunningExportTasks", "Parameters": { "ExportTaskIdentifier.$": "States.Format('export-task-{}', $.Constants.ExecutionUuid)", "IamRoleArn.$": "$.IamRoleArn", "KmsKeyId.$": "$.KmsKeyId", "S3BucketName.$": "$.S3BucketName", "S3Prefix.$": "States.Format('{}/{}/d={}', $.Constants.S3RdsClusterExportPrefix, $.DBClusterIdentifier, $.Constants.ExecutionStartDate)", "SourceArn.$": "$.SourceArn", "S3MetadataJsonKey.$": "States.Format('{}/{}/d={}/metadata.json', $.Constants.S3RdsClusterExportPrefix, $.DBClusterIdentifier, $.Constants.ExecutionStartDate)" }, "ResultPath": "$.Variables" }, "DescribeRunningExportTasks": { "Type": "Task", "Parameters": { "Filters": [ { "Name": "status", "Values": [ "starting", "in_progress", "canceling" ] } ] }, "Resource": "arn:aws:states:::aws-sdk:rds:describeExportTasks", "ResultPath": "$.DescribeRunningExportTasks", "Next": "Is 3 or more tasks running?", "ResultSelector": { "LengthOfRunningExportTasks.$": "States.ArrayLength($.ExportTasks)" } }, "Is 3 or more tasks running?": { "Type": "Choice", "Choices": [ { "Variable": "$.DescribeRunningExportTasks.LengthOfRunningExportTasks", "NumericGreaterThanEquals": 3, "Next": "Wait until the number of running tasks become less than 3" } ], "Default": "StartExportTask" }, "Wait until the number of running tasks become less than 3": { "Type": "Wait", "Seconds": 600, "Next": "DescribeRunningExportTasks" }, "StartExportTask": { "Type": "Task", "Parameters": { "ExportTaskIdentifier.$": "$.Variables.ExportTaskIdentifier", "IamRoleArn.$": "$.Variables.IamRoleArn", "KmsKeyId.$": "$.Variables.KmsKeyId", "S3BucketName.$": "$.Variables.S3BucketName", "SourceArn.$": "$.Variables.SourceArn", "S3Prefix.$": "$.Variables.S3Prefix" }, "Resource": "arn:aws:states:::aws-sdk:rds:startExportTask", "ResultPath": "$.StartExportTask", "Next": "Wait until the task become complete status" }, "Wait until the task become complete status": { "Type": "Wait", "Seconds": 600, "Next": "DescribeTheExportTask" }, "DescribeTheExportTask": { "Type": "Task", "Parameters": { "Filters": [ { "Name": "export-task-identifier", "Values.$": "States.Array($.StartExportTask.ExportTaskIdentifier)" } ] }, "Resource": "arn:aws:states:::aws-sdk:rds:describeExportTasks", "ResultPath": "$.DescribeTheExportTask", "Next": "Is the export task complete?", "ResultSelector": { "Status.$": "$.ExportTasks[0].Status" } }, "Is the export task complete?": { "Type": "Choice", "Choices": [ { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "COMPLETE", "Comment": "COMPLETE", "Next": "PutObject" }, { "Or": [ { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "CANCELED" }, { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "FAILED" }, { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "CANCELING" } ], "Comment": "CANCELED OR FAILED", "Next": "Fail" }, { "Or": [ { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "STARTING" }, { "Variable": "$.DescribeTheExportTask.Status", "StringEquals": "IN_PROGRESS" } ], "Next": "Wait until the task become complete status", "Comment": "STARTING OR IN_PROGRESS" } ], "Default": "Fail" }, "PutObject": { "Type": "Task", "Parameters": { "Body": { "ExportTaskIdentifier.$": "$.Variables.ExportTaskIdentifier" }, "Bucket.$": "$.Variables.S3BucketName", "Key.$": "$.Variables.S3MetadataJsonKey" }, "Resource": "arn:aws:states:::aws-sdk:s3:putObject", "Next": "Success" }, "Success": { "Type": "Succeed" }, "Fail": { "Type": "Fail" } } }
SnowflakeからAmazon S3のデータを読む方法
Snowflakeには、Amazon S3(以下、S3)上のオブジェクトを直接参照する機能があります。
Storage Integration と External Stage
SnowflakeからS3へのアクセスは、Snowflake内のStorage IntegrationとExternal Stageという2つの構成要素によって実現されます。
まず、Storage Integrationというオブジェクトを作成することで払い出される External ID を使用して、AWS側のIAM Roleを使用してS3へのアクセスが可能な状態を作ります。
そして、作成したStorage Integrationを使用して実際にS3に対するアクセスを行うため、External Stageと呼ばれるオブジェクトを作成します。External Stageは、S3やAzure Blob Storage、Google Cloud Storageといった外部ストレージをSnowflake内で扱えるようにするために、Location Mappingを行うオブジェクトだと考えるとわかりやすいと思います。
たとえば、以下のようなExternal Stageを作成したとします。
CREATE STAGE my_ext_stage URL = 's3://load/files/' STORAGE_INTEGRATION = myint;
このExternal Stageに対して、Snowflake内で @my_ext_stage/path/to/object
として参照するようなSQLを書いた場合、内部的には「Storage Integration の myint に定義された認証情報を使用して s3//load/files/path/to/object
へアクセスする」処理が行われます。
SQLによるS3上のデータ参照
Snowflakeでは、External Stage上のデータをSQLで参照する際、format指定が必要です。*1 formatはJSON、XML、Parquet、ORCなどが用意されています。
また、file_patternを使用して特定のファイルのみを参照することも可能です。
以下に、JSONファイルを読み込む例を示します。
select $1:id::number as id, $1:name::varchar as name, $1:created_at::timestamp_tz as created_at FROM @my_ext_stage/data/ (FILE_FORMAT => 'my_json_format', PATTERN => '.*\.json');
上記の例では、my_ext_stage の data/ 配下にある .json で終わるファイルを対象に、JSONデータを構造化されたテーブル形式で読み込んでいます。 $1
は各JSONレコードを表しています。 $1
には variant 型でレコード全体が格納されているので、 :
を使ってJSONのフィールドにアクセスしています。
Aurora Cluster Exportで出力したParquetファイルをdbtを使ってSnowflakeへimportする
ここまでの説明で、Aurora Cluster Exportの結果がどのような形式でS3に出力されるのか、その結果に対してSnowflakeからアクセスする方法について書きました。ここからは本記事の主題である、Aurora Cluster Exportで出力したParquetファイルをdbtを使ってSnowflakeへImportする話題へ移ります。
処理の全体像
Aurora Cluster Exportの結果をSnowflakeへImportするためには、以下の処理を行います。
- ① Aurora Cluster Exportで取得したメタ情報をクエリ可能にする
- ② ①のメタ情報を元にParquetをロードするためのクエリを組み立てる
- ③ ② のクエリを実行する
これらの処理をdbtで実現します。各ステップの説明では、dbt で使用するSQLをベースに説明しますが、社内リポジトリで使用している dbt macros を展開して説明するため、冗長な表現が多くなります。ご容赦ください。
① Aurora Cluster Exportで取得したメタ情報をクエリ可能にする
先程の説明で Aurora Cluster Export の結果として、export_tables_info_${--export-task-identifier}.json
というオブジェクトにテーブルのメタ情報が含まれているという説明をしました。このオブジェクトの情報をParseし、テーブルのメタ情報をクエリ可能なテーブルを作成します。
{% set database = var(target.name)['database']['aurora_cluster_export'] %} {% set execution_timestamp = modules.datetime.datetime.now() %} {% set execution_timestamp_str = env_var('EXECUTION_TIMESTAMP', "none") %} {% if execution_timestamp_str != "none" %} {{ set execution_timestamp = modules.datetime.datetime.fromisoformat(execution_timestamp_str) }} {% endif %} {% set target_dates = [execution_timestamp.strftime("%Y-%m-%d")] %} {% set db_cluster_identifiers = [ 'my-aurora-cluster1', 'my-aurora-cluster2', ] %} with raw as ( {%- for db_cluster_identifier in db_cluster_identifiers %} {%- for target_date in target_dates %} select metadata$filename as filename, $1 as value from @{{ database }}.stages.aurora_cluster_export/{{ db_cluster_identifier }}/d={{ target_date }}/ (file_format => '{{ database }}.file_formats.json_file_format', pattern => '.*export_tables_info_.*\.json') {%- if not loop.last %} union all {%- endif %} {%- endfor %} {%- if not loop.last %} union all {%- endif %} {%- endfor %} ), extracted as ( select regexp_replace(r.filename, '/export_tables_info_[^/]+\.json$') as export_task_path_prefix, t.value:status::varchar as status, t.value:status::varchar = 'COMPLETE' as is_completed, t.value:sizeGB::float as size_gb, convert_timezone('UTC', to_timestamp_ntz(t.value:tableStatistics:extractionStartTime::varchar, 'MON DD, YYYY, HH12:MI:SS AM')) as extraction_start_time, convert_timezone('UTC', to_timestamp_ntz(t.value:tableStatistics:extractionEndTime::varchar, 'MON DD, YYYY, HH12:MI:SS AM')) as extraction_end_time, split_part(t.value:target::varchar, '.', 1) as database_name, split_part(t.value:target::varchar, '.', 2) as schema_name, split_part(t.value:target::varchar, '.', 3) as table_name, t.value as raw_table_status, from raw r, lateral flatten (r.value:perTableStatus) t, ), selected as ( select * from extracted where is_completed qualify 1 = row_number() over ( partition by database_name, schema_name, table_name order by extraction_start_time desc ) ) select s.* exclude (export_task_path_prefix), concat( '@{{ database }}.stages.aurora_cluster_export/', regexp_replace(s.export_task_path_prefix, '^aurora-cluster-export/'), '/', s.database_name, '/', s.schema_name, '.', s.table_name, '/' ) as table_data_stage_path, c.value:columnName::varchar as column_name, c.value:originalType::varchar as column_original_type, c.value:expectedExportedType::varchar as column_parquet_type, -- NOTE: Do not use these columns for now -- c.value:ordinalPosition::int as column_index, -- c.value:originalCharMaxLength::int as column_char_length, -- c.value:originalDateTimePrecision::int as column_datetime_precision, -- c.value:originalNumPrecision::int as column_decimal_precision, from selected s, lateral flatten (s.raw_table_status:schemaMetadata:originalTypeMappings) c
このクエリでは、各Aurora Cluster毎に出力されたAurora Cluster Exportの結果から最新のメタ情報を取得してテーブル化しています。最終成果物として
- Export Taskの結果(statusや処理時間など)
- Export元のデータベース名、スキーマ名、テーブル名
- Parquet ファイル群が格納されているS3のロケーションをExternal Stageに変換した文字列
- カラム名とExport元のデータベースにおける型、Parquetの型
を格納しています。
② ①のメタ情報を元にParquetをロードするためのカラム定義SQLを組み立てる
①で作成したテーブルのデータを元に、Parquetをロードするためのカラム定義SQLを組み立てます。 ...
には、取得元のテーブル情報を入力して、各テーブル毎にParquetロード用のクエリを生成します。
-- 取得元データベース・テーブルの情報 {% set database_name = '...' %} {% set schema_name = '...' %} {% set table_name = '...' %} -- ①で作ったメタデータテーブルの参照 {% set export_tables_info_ref = ref('...') %} with prep as ( select table_data_stage_path, -- NOTE: The following columns are used for building column expressions concat( 'get_ignore_case($1, ''', column_name, ''')' ) as column_value_expression, concat(' as ', case -- NOTE: Add suffix '__snowflake_reserved_word_suffix' to the reserved words when column_name in ('to', 'order', 'all') then concat(column_name, '__snowflake_reserved_word_suffix') else column_name end ) as column_alias_expression, column_original_type as o, column_parquet_type as p, from {{ export_tables_info_ref }} where database_name = '{{ database_name }}' and schema_name = '{{ schema_name }}' and table_name = '{{ table_name }}' ) select table_data_stage_path as TABLE_DATA_STAGE_PATH, -- TODO: Currently, This macro only supports MySQL. We will need to support PostgreSQL or others in the future. -- <mysql type> <parquet type> case -- bigint int64 -- bigint unsigned fixed_len_byte_array(9) (DECIMAL(20,0)) -- int int32 -- int unsigned int64 -- mediumint int32 -- mediumint unsigned int64 -- smallint int32 -- smallint unsigned int32 -- tinyint int32 -- tinyint unsigned int32 when startswith(o, 'bigint') then concat(column_value_expression, '::number', column_alias_expression) when startswith(o, 'int') then concat(column_value_expression, '::number', column_alias_expression) when startswith(o, 'mediumint') then concat(column_value_expression, '::number', column_alias_expression) when startswith(o, 'smallint') then concat(column_value_expression, '::number', column_alias_expression) when startswith(o, 'tinyint') then concat(column_value_expression, '::number', column_alias_expression) -- TODO: boolean? -- double double -- float double -- float unsigned double when startswith(o, 'double') then concat(column_value_expression, '::float', column_alias_expression) when startswith(o, 'float') then concat(column_value_expression, '::float', column_alias_expression) -- binary binary -- blob binary when o in ('blob', 'binary') then concat(column_value_expression, '::binary', column_alias_expression) -- char binary (UTF8) -- text binary (UTF8) -- varchar binary (UTF8) -- enum binary (UTF8) when o = 'char' then concat(column_value_expression, '::varchar', column_alias_expression) when o = 'text' then concat(column_value_expression, '::varchar', column_alias_expression) when o = 'varchar' then concat(column_value_expression, '::varchar', column_alias_expression) when o = 'enum' then concat(column_value_expression, '::varchar', column_alias_expression) -- date binary (UTF8) when o = 'date' then concat('nullif(', column_value_expression, '::varchar, ''0000-00-00'')::date', column_alias_expression) -- time binary (UTF8) when o = 'time' then concat('nullif(', column_value_expression, '::varchar, ''00:00:00'')::time', column_alias_expression) -- decimal fixed_len_byte_array(10) (DECIMAL(22,5)) -- decimal fixed_len_byte_array(11) (DECIMAL(24,4)) -- decimal fixed_len_byte_array(14) (DECIMAL(32,22)) -- decimal fixed_len_byte_array(14) (DECIMAL(32,5)) -- decimal fixed_len_byte_array(9) (DECIMAL(20,3)) -- decimal int32 (DECIMAL(8,4)) -- decimal int64 (DECIMAL(13,4)) -- decimal unsigned fixed_len_byte_array(9) (DECIMAL(20,4)) when startswith(o, 'decimal') then concat(column_value_expression, '::', regexp_substr(p, 'DECIMAL\\([0-9]+,[0-9]+\\)'), column_alias_expression) -- datetime int64 (TIMESTAMP_MICROS) -- NOTE: All our MySQL Servers is set to 'Asia/Tokyo' timezone. -- So, we need to convert 'TIMESTAMP_MICROS' to 'TIMESTAMP_TZ' -- and subtract 9 hours from the result. -- ref. https://dev.mysql.com/doc/refman/8.4/en/datetime.html -- > MySQL converts TIMESTAMP values from the current -- > time zone to UTC for storage, and back from UTC -- > to the current time zone for retrieval. (This -- > does not occur for other types such as DATETIME.) when o = 'datetime' then concat(column_value_expression, '::timestamp_tz - interval ''9 hours''', column_alias_expression) -- timestamp int64 (TIMESTAMP_MICROS) when o = 'timestamp' then concat(column_value_expression, '::timestamp_tz', column_alias_expression) -- json binary (UTF8) when o = 'json' then concat('parse_json(', column_value_expression, '::varchar)', column_alias_expression) else concat(column_value_expression, '::original_type:', o, '::parquet_type:', p, '::snowflake_type:unknown') -- NOTE: Syntax error if the type is unknown end as COLUMN_EXPRESSION, from prep
データを取得するためのSQLを組み立てるSQLなので非常に読みにくいですが、成果物としては
- 実データが格納されているExternal Stageの参照先
- 各カラムの定義情報(例:
get_ignore_case($1, ‘a')::number as a
,nullif(get_ignore_case($1, ‘b')::varchar, '0000-00-00')::date as b
,parse_json(get_ignore_case($1, ‘c')::varchar) as c
)
が、出力されます。
③ ② のクエリを実行する
ここまでのクエリでAurora Cluster Exportの結果をSnowflakeへImportするSQLの準備ができました。あとは、組み立てたSQLを実行するだけです。
{% set sql_statement %} with sql_elements as ( -- ②で作ったデータを参照するSELECT文 ) select TABLE_DATA_STAGE_PATH, COLUMN_EXPRESSION, from sql_elements {% endset %} {{ log('[' ~ model['alias'] ~ '] sql_statement: ' ~ sql_statement) }} {% set results = dbt_utils.get_query_results_as_dict(sql_statement) %} {{ log('[' ~ model['alias'] ~ '] results: ' ~ results) }} select {%- for column_expression in results['COLUMN_EXPRESSION'] %} {{ column_expression }}, {%- endfor %} from {{ results['TABLE_DATA_STAGE_PATH'] | first }} (file_format => 'my_parquet_format', pattern => '.*\.parquet')
このSQLでは dbt_utils.get_query_results_as_dict(...)
を用いて事前にクエリ実行した結果を用いてクエリを組み立てています。②の結果である External Stage の参照先情報が FROM 句に、各カラムの定義情報を for 文を用いて SELECT の対象として定義しています。
この①②③の手順で、Aurora Cluster Exportの結果をSnowflakeへImportするdbt modelを定義することができました。あとは dbt build
を実行するだけでSnowflakeにデータが import されます。
補足: Parquetファイルをロードするときに infer_schema を使っても良かったのでは?
Snowflakeには infer_schema(...)
関数と呼ばれる、ファイルからスキーマを生成してくれる関数があります。
Parquet ファイルはフォーマットの仕様上、ヘッダ情報が含まれるため infer_schema(...)
関数を使用することも検討しました。しかし、いくつかの理由で infer_schema(...)
関数を使用せず、自前でカラム定義情報を作り出すことにしました。理由は以下のとおりですが、つまるところ、「結局データソース側の型を考慮してカラム定義を作り出す必要があった」という理由です。
- データソース側でdate型であるカラムが
infer_schema(...)
関数では、文字列型になってしまうので、結局データソース側の型を参照して変換する必要がある - 弊社のMySQLはタイムゾーンが
Asia/Tokyo
で稼働しているため、データソース側でtimestamp型かdatetime型かによって、オフセット計算の要否が変わる。つまり、結局データソース側の型を参照して型変換する必要がある。*2 - MySQLでは慣習的に
tinyint(1)
をbooleanとして扱います。しかし、infer_schema(...)
関数を使うとtinyint(1)
もすべて number 型になってしまうため、結局データソース側の型を参照して型変換する必要がある。(後述しますが、この問題はまだ解決されていません)
infer_schema(...)
関数は便利ですが、今回のユースケースにおいて、正しいデータ変換を実現するためには、不適でした。
Future Works
今回紹介した仕組みは、まだまだ課題のある仕組みだと思っています。そのため、今後いくつかの施策によってブラッシュアップしようと考えています。
データソース側のInformation Schemaの取得
ここまでの説明で、データソース側のInformation Schemaに関しては全く触れていませんでした。Aurora Cluster Exportの機能では、Information Schemaのデータは取得することができません。Aurora Cluster Exportによって取得可能な情報は、あくまでParquetを正しくロードするための最低限の情報のみです。
そのため、正確な型情報やカラムコメントをSnowflake側に反映させることができていません。特に、先程も述べましたが、MySQLは慣習的に tinyint(1)
をbooleanとして扱いますが、Aurora Cluster Exportではdisplay widthは出力されません *3 *4。そのため、booleanの型変換を実現できていないのが現状です。
そこで今後、データソース側のInformation Schemaを取得して型変換やカラムコメントの生成に役立てようと思っています。この仕組みを作った当初は、データベースの接続時にはクライアントが必要でしたが、Data APIの登場によってAWS Step Functionsからの呼び出しも容易になったので、試してみようと思っています。
ParquetファイルをADD_FILES_ONLYでSnowflake Icebergテーブルとして取り込む
Snowflakeへデータを取り込む方法の1つとして、IcebergテーブルにCOPY文でデータ取り込みを行う方法があります。このとき ADD_FILES_ONLY
というオプションを設定することができます。
この機能は11月15日に出た新機能で、COPY文でデータ取り込みを行う際、Parquetファイルをデータとして読み込むのではなく、ファイルの移動を行うのみでデータ取り込みを完了させる機能です。
当初は、結局Snowflakeが一度ParquetファイルをロードすることになるのでIcebergテーブルとして保存することに魅力を感じていませんでしたが、Parquetファイルを移動させるだけでコピーが完了するのはデータロードのパフォーマンスが非常に上がるので、魅力的です。
試して公開できる状態になったら、「Aurora Cluster Exportで出力したデータをdbtを使ってSnowflakeへImportする(v2)」というブログを出そうと思います。
おわりに
本記事では、Aurora Cluster Exportを使用してS3に出力したデータをSnowflakeへインポートする方法について説明しました。非常に難解な仕組みだったとは思いますが、dbtを活用して、S3上のParquetファイルをデータ変換処理含めてデータロードできることを示しました。また、今後の課題として、Information Schemaの取得やSnowflake Icebergテーブルを活用した効率的なデータ取り込みなどについても触れました。より洗練された仕組みになったら、また新しい記事を書くのでご期待ください。
LayerXでは一緒にデータ基盤を作ってくれる仲間を募集しています。ちょっとでも興味のある方は一度ぜひお話しましょう!
*1:External Stageの定義時にformatの指定も可能ですが、External Stageの汎用性を高めるため、弊社では使用していません。
*2:MySQLはdatetimeとtimestampで時間の保持方法が異なります。 https://dev.mysql.com/doc/refman/8.4/en/datetime.html
*3:ちなみに display with を書くこと自体、MySQL 8 では非推奨となっています。 tinyint(1) を定義するケースと ZEROFILL とともに定義するケースのみが例外的に認められている状況です。 https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-19.html#:~:text=Display%20width%20specification%20for%20integer%20data%20types%20was%20deprecated%20in%20MySQL%208.0.17%2C%20and%20now%20statements%20that%20include%20data%20type%20definitions%20in%20their%20output%20no%20longer%20show%20the%20display%20width%20for%20integer%20types%2C%20with%20these%20exceptions%3A
*4:ちなみに unsigned に関しては、Information Schema にも残されていません。そのため、弊社では tinyint(1) unsigned の利用を運用上やめることにしています。 https://bugs.mysql.com/bug.php?id=98250