はじめに
dbt(data build tool) Python modelとSnowflakeのSnowparkを活用することで、データ取得と変換の開発体験の向上を実現できます。SQLは宣言的な言語であり、複雑な手続き的な処理を書くには限界があります。しかし、dbt Python modelはそのSQLの弱点を補完してくれるので、非常に便利です。
さらに、Snowflakeの外部アクセス統合を使うことで、APIを叩いて外部のデータを取得することも簡単に実現できます。これにより、さまざまなデータソースをシームレスに取り込むことが可能になります。
本記事では、dbt Python modelとSnowparkを用いた具体的なデータ取り込みの方法や、その仕組みについて紹介します。開発体験の良さを感じていただけると嬉しいです!
Snowparkとは
Snowparkは、Snowflakeが提供するフレームワークです。データエンジニアやデータサイエンティスト、データアナリストがSnowflake上でデータ処理や分析を行うために利用されます。SQLベースの操作に加え、Snowparkを利用することで、Python、Scala、Javaといったプログラミング言語を使って、Snowflake内で直接データ操作や分析処理のロジックを記述できます。
これにより、ユーザーは馴染みのある言語で複雑なデータパイプラインを構築し、大規模なデータセットのクレンジングや変換、機械学習モデルの予測を行うことが可能になります。
dbt Python modelとは
dbt Python modelは、dbtが持つ機能の1つでdbt modelをPythonを用いて記述できます。dbt Python modelは、dbt sourceや他のdbt modelを読み変換をし、変換をしたデータセットを返す関数です。データの操作はDataFrameで行われます。Snowflakeでdbt Python modelを利用する場合、pandas DataFrameとSnowparkのpandas互換DataFrameの両方を扱えます。
# models/python/my_python_model.py def model(dbt, session): dbt.config( # Jinjaのconfigマクロと同等の設定を行える materialized="table", # tableもしくはincrementalのみがサポートされている database="...", schema="..." alias="..." python_version="3.11",# 使用するPythonのバージョン ) my_sql_model_df = dbt.ref("my_sql_model") final_df = ... # 変換する処理を加える return final_df # 結果を返す
Pythonで関数でデータの変換処理を定義することや、PyPIにある外部パッケージも利用できます。
# models/python/my_python_model.py import holidays def is_holiday(date_col): # Chez Jaffle french_holidays = holidays.France() is_holiday = (date_col in french_holidays) return is_holiday def model(dbt, session): dbt.config( # Jinjaのconfigマクロと同等の設定を行える materialized="table", # tableもしくはincrementalのみがサポートされている database="...", schema="..." alias="..." python_version="3.11", # 使用するPythonのバージョン packages = ["holidays"]# 使用するPyPIパッケージ ) orders_df = dbt.ref("stg_orders") df = orders_df.to_pandas() df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday) # is_holiday関数を適用する return df
もちろん、dbt Python modelで定義したdbt modelは、 ref()
を用いてSQL中から呼び出せます。
{{ config( ... ) }} SELECT * FROM {{ ref('my_python_model') }} -- Pythonのファイル名がdbt modelの名前になる
dbt Python modelでは特定のデータプラットフォームのみがサポートされているので、利用する際は注意が必要です。dbt Python modelがサポートされているプラットフォームかは、dbtの公式ドキュメントを参考にしてください。
https://docs.getdbt.com/docs/supported-data-platforms
dbt Python modelとSnowparkの組み合わせ
dbt Python modelはSnowflakeで動かす場合、Snowparkを利用する必要があります。Pythonを用いてSnowflake上のdbt sourceやdbt modelを柔軟に操作し、データの変換や取得を行うことが可能になります。この組み合わせにより、Pythonで記述したロジックをSnowflakeの計算環境で直接実行でき、SQLの標準機能を超えた複雑なデータ処理が実現します。
Snowparkにはユーザー定義関数(UDF)の機能があり、これを利用することで、Snowflake内でPythonコードを用いて高度なデータ処理を実行できます。UDFを使用することで、複雑なデータ変換や高度な数学的処理、さらには機械学習モデルの統合など、SQLクエリ内での多様な処理が可能になります。さらに、UDFはdbt Python modelからも利用できるため、データの変換だけでなく、Snowflake上に存在しないデータを取得し、必要に応じて複雑な関数をDataFrame操作することができます。
以下の例では、UDFとdbt Python modelを用いてランダムな数値を生成し、テーブルとしてマテリアライズします。
import snowflake.snowpark.types as T import snowflake.snowpark.functions as F import numpy # UDFを登録する。@udfデコレーターと同様の処理をする。注意: 登録してもSnowflake上に関数などはできない。 def register_udf_add_random(): # udfを定義する add_random = F.udf( lambda: numpy.random.normal(), # 入力を必要とせず、ランダムな値を生成 return_type=T.FloatType() ) return add_random def model(dbt, session): dbt.config( materialized = "table", database="workspace", schema="sample", alias="random_values_udf", packages = ["numpy"] ) add_random = register_udf_add_random() # 新しいDataFrameを作成 num_rows = 1000 # 生成する行数 df = session.range(num_rows).select( F.col("id").as_("row_id"), add_random().as_("random_value") ) return df
dbtを実行すると以下の画像のように、データが入ります。
この例を元に、dbt Python modelとUDFがどのようにSnowflake上で作用しているかを理解するために、実際にSnowflake上で実行されるクエリを見ます。クエリを見ると、dbt Python modelで定義した関数が、 PROCEDURE
として実行されていることがわかります。エントリーポイントはmain関数です。main関数では、dbtObjをインスタンス化されます。
dbtObjは、dbt Python model内で使用される特別なオブジェクトで、dbtの機能をPythonコードで利用できるようにするためのラッパークラスです。このオブジェクトは、以下のようなdbtの主要な機能にアクセスする手段を提供します。
- ref: 他のdbt modelを参照するために使用され、指定されたモデルからデータを取得します。
- source: dbt sourceを参照するために使用され、指定されたデータソースからデータを取得します。
- config: モデルの設定(マテリアライズの種類、使用するパッケージなど)を取得したり設定したりします。
- this: 現在のモデルが作成されるデータベース、スキーマ、テーブル名を取得します。
このdbtObjを使うことで、Pythonコード内でSQLモデルのように他のdbtリソースを参照したり、設定を取得したりすることが可能になります。
dbtObjをインスタンス化した後、model関数が実行され、DataFrameが取得されます。このDataFrameは、dbtObjを使って取得したデータを基に作成され、変換処理が行われます。最後に、materialize関数で、このデータフレームがSnowflake上にテーブルとしてマテリアライズ化されます。
Snowflakeで実際に実行されるクエリ
WITH test_udf__dbt_sp AS PROCEDURE () RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.11' PACKAGES = ('numpy', 'snowflake-snowpark-python') HANDLER = 'main' EXECUTE AS CALLER AS $$ import snowflake.snowpark.types as T import snowflake.snowpark.functions as F import numpy def register_udf_add_random(): # UDFを定義 add_random = F.udf( lambda: numpy.random.normal(), # 入力を必要とせず、ランダムな値を生成 return_type=T.FloatType() ) return add_random def model(dbt, session): dbt.config( materialized = "table", database="workspace", schema="sample", alias="random_values_udf", packages = ["numpy"] ) add_random = register_udf_add_random() # 新しいDataFrameを作成 num_rows = 1000 # 生成する行数 df = session.range(num_rows).select( F.col("id").as_("row_id"), add_random().as_("random_value") ) return df # This part is user provided model code # you will need to copy the next section to run the code # COMMAND ---------- # this part is dbt logic for get ref work, do not modify def ref(*args, **kwargs): refs = {} key = '.'.join(args) version = kwargs.get("v") or kwargs.get("version") if version: key += f".v{version}" dbt_load_df_function = kwargs.get("dbt_load_df_function") return dbt_load_df_function(refs[key]) def source(*args, dbt_load_df_function): sources = {} key = '.'.join(args) return dbt_load_df_function(sources[key]) config_dict = {} class config: def __init__(self, *args, **kwargs): pass @staticmethod def get(key, default=None): return config_dict.get(key, default) class this: """dbt.this() or dbt.this.identifier""" database = "workspace" schema = "sample" identifier = "random_values_udf" def __repr__(self): return 'workspace.sample.random_values_udf' class dbtObj: def __init__(self, load_df_function) -> None: self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function) self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function) self.config = config self.this = this() self.is_incremental = False # COMMAND ---------- # To run this in snowsight, you need to select entry point to be main # And you may have to modify the return type to text to get the result back # def main(session): # dbt = dbtObj(session.table) # df = model(dbt, session) # return df.collect() # to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python # then you can do the following to run model # dbt = dbtObj(session.table) # df = model(dbt, session) def materialize(session, df, target_relation): # make sure pandas exists import importlib.util package_name = 'pandas' if importlib.util.find_spec(package_name): import pandas if isinstance(df, pandas.core.frame.DataFrame): session.use_database(target_relation.database) session.use_schema(target_relation.schema) # session.write_pandas does not have overwrite function df = session.createDataFrame(df) df.write.mode("overwrite").save_as_table('workspace.sample.random_values_udf', table_type='') def main(session): dbt = dbtObj(session.table) df = model(dbt, session) materialize(session, df, dbt.this) return "OK" $$ CALL test_udf__dbt_sp();
また、UDFとは異なるUDTF(User Defined Table Function)があります。UDFとUDTFの違いは返すデータのフォーマットとPythonクラスで定義できる点です。UDFは行を返すのに対し、UDTFは行と列つまりテーブルを返します。UDTFでは、UDFと異なり出力のスキーマを定義する必要があります。また、先ほどのUDFはlambda関数を用いて定義していましたが、UDTFはprocess関数を持つクラスで処理を定義することができます。先ほどの例をUDTF化した例を以下に示します。
import snowflake.snowpark.types as T from snowflake.snowpark.functions import lit import numpy def model(dbt, session): dbt.config( materialized = "table", database="workspace", schema="sample", alias="random_values_udtf", packages = ["numpy", "snowflake-snowpark-python"] ) # UDTFを定義 class RandomValueGenerator: def __init__(self): self.add_random = lambda: numpy.random.normal() # processメソッドでデータを生成して返す def process(self, num_rows: int): for i in range(num_rows): yield (i, self.add_random()) # UDTFの名前。配列の各要素はdatabase, schema, nameを意味する UDTF_NAME = ["landing_dbt_snowpark", "test", "test_udtf"] # UDTFを登録する。@udtfデコレーターと同様の処理をする。注意: 登録してもSnowflake上に関数などはできない。 session.udtf.register( RandomValueGenerator, name=UDTF_NAME, session=session, output_schema=T.StructType([ T.StructField("row_id", T.IntegerType()), T.StructField("random_value", T.FloatType()) ]), packages=["numpy", "snowflake-snowpark-python"], replace=True ) # UDTFを呼び出して新しいDataFrameを作成 num_rows = 1000 # 生成する行数 df = session.table_function(UDTF_NAME, lit(num_rows)) # UDTFを呼ぶ。litは引数をprocess関数に値を渡すために必要。 return df
実際に外部APIからデータを取得する
今回は仮想の祝日情報提供サイト、「祝日ナビ」に関するデータをAPI経由で取得します。Snowflakeで外部のAPIをコールするためには以下のリソースが必要です。
- ネットワークルール
- シークレット(Optional)
- 外部アクセス統合
それぞれのリソースを作成する意味とSQLステートメントは以下の通りです。
ネットワークルール
Snowflakeが外部APIにアクセスするためには、ネットワークポリシーの設定が必要です。具体的には、外部アクセスを許可するためのネットワークルールを定義する必要があります。これにより、Snowflakeが外部のAPIエンドポイントに対してHTTPリクエストを送信できるようになります。
create or replace network rule workspace.shukujitsu_navi.api_rule mode = egress type = host_port value_list = ('api.example.com');
シークレット
外部APIにアクセスするためには、APIキーや認証情報などの機密情報(シークレット)を安全に管理する必要があります。これには、Snowflakeのシークレット機能を利用します。たとえば、APIキーを暗号化して保存し、クエリ実行時に動的に解読して利用することができます。
create or replace secret workspace.shukujitsu_navi.api_key type = GENERIC_STRING SECRET_STRING = '<your api key>';
外部アクセス統合
Snowflakeで外部APIにアクセスするための準備として、外部アクセス統合の設定が必要です。これにより、Snowflakeが指定されたエンドポイントに対してリクエストを送信し、レスポンスを受け取るための権限が与えられます。外部アクセス統合には、HTTPプロトコルやHTTPSプロトコルを使用して、外部のWebサービスやAPIにアクセスする設定が含まれます。
-- Account Adminで実行する create or replace external access integration shukujitsu_navi_api allowed_network_rules = (workspace.shukujitsu_navi.api_rule) allowed_authentication_secrets = (workspace.shukujitsu_navi.api_key) enabled = true;
祝日ナビのAPIからデータを取得するのに必要なリソースが全て整いました。実際にdbt Python modelを用いて祝日ナビの祝日データを取得し、テーブル化します。以下はdbt Python modelのコードになります。
# models/workspace__shukujitsu_navi__holidays.py from snowflake.snowpark.functions import udtf from snowflake.snowpark.types import StructType, StructField, StringType, VariantType from snowflake.snowpark.functions import lit def model(dbt, session): dbt.config( materialized="table", database="workspace", schema="shukujitsu_navi", alias="holidays", packages = ["requests", "snowflake-snowpark-python"], tags=["snowpark", "materialized:table:daily"] ) import _snowflake import requests class SyukujitsuNaviFetcher: """ 祝日ナビのAPI経由で祝日データを取得するクラス。 """ def __init__(self): """ SyukujitsuNaviFetcherのインスタンスを初期化し、APIヘッダーを設定する。 """ self.headers = self._get_headers() def _get_headers(self): """ APIリクエストに必要なヘッダーを生成する。 Returns: dict: APIリクエストヘッダー """ api_key = _snowflake.get_generic_secret_string("api_key") # api_keyを取得する return { "Authorization": f"Token {api_key}", "Content-Type": "application/json" } def process(self, year: str, from_date: str, to_date: str): """ 指定された期間の祝日データを取得し、処理する。日付を指定しない場合は、全ての祝日データを取得する。 Args: year (str): 取得する年 from_date (str): 開始日 to_date (str): 終了日 Returns: list: 処理された祝日データのリスト """ url = f"https://api.example.com/v1/holidays" response_data = self._get_holidays(url, year, from_date, to_date) data = [(row,) for row in response_data['data']] return data def _get_holidays(self, url: str, year: str, from_date: str, to_date: str): """ 祝日ナビのAPIから祝日データを取得する。 Args: url (str): APIエンドポイントURL year (str): 取得する年 from_date (str): 開始日 to_date (str): 終了日 Returns: dict: APIレスポンスのJSONデータ Raises: requests.HTTPError: APIリクエストが失敗した場合 """ params = {} if year: params["year"] = year if from_date: params["from"] = from_date if to_date: params["to"] = to_date response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json() UDTF_NAME = ["workspace", "syukujitsu_navi", "holidays"] session.udtf.register( SyukujitsuNaviFetcher, name=UDTF_NAME, session=session, output_schema=StructType([StructField("value", VariantType())]), # Variant型として出力することを明示する input_types=[StringType(), StringType(), StringType()], # 3つのStringの引数を受け取ることを明示する packages=["requests", "snowflake-snowpark-python"], external_access_integrations=["SYUKUJITSU_NAVI_API"], # 外部アクセス統合にSYUKUJITSU_NAVI_APIを使用することを明示する secrets={"api_key": "WORKSPACE.SYUKUJITSU_NAVI.API_KEY"}, # シークレットとしてWORKSPACE.SYUKUJITSU_NAVI.API_KEYを使用し、コード内からはapi_keyとして読めるように明示する replace=True ) df1 = session.table_function(UDTF_NAME, lit(""), lit(""), lit("")) # 全ての引数を空文字で関数を呼ぶ return df1
上記のdbt Python modelを実行することにより、Snowflake上に祝日ナビの祝日に関するデータがテーブルとして作成されます。コード内で、先ほど作成した外部アクセス統合やシークレットを利用していることがわかります。より分かりやすくるためににdbt modelを実行し、祝日ナビのデータを取得し、テーブル化するまでの流れを示す図を用意しました。
さらに、このモデルを定期的に実行することで、祝日データを常に最新の状態に保つことが可能です。
まとめ
dbt Python modelとSnowparkを用いて、外部APIからデータを取得する方法について紹介をしました。dbt Python modelは、SQLを補完する強力なツールです。手続き的な処理を書ける点や、外部統合アクセスを用いてAPIを簡単に叩ける点など、Pythonの柔軟性を活かせます。また、他のdbt modelとシームレスにつなげられるので、特別な準備を必要とせず、既存のワークフローに自然に組み込めるのが大きな魅力です。SQLとPythonを組み合わせることで、より高度で複雑なデータ操作が可能になり、データ変換の可能性と開発体験を向上させることができました。