LayerX エンジニアブログ

LayerX の エンジニアブログです。

dbt Python model × Snowparkで外部APIのデータを取得する

はじめに

dbt(data build tool) Python modelとSnowflakeのSnowparkを活用することで、データ取得と変換の開発体験の向上を実現できます。SQLは宣言的な言語であり、複雑な手続き的な処理を書くには限界があります。しかし、dbt Python modelはそのSQLの弱点を補完してくれるので、非常に便利です。

さらに、Snowflakeの外部アクセス統合を使うことで、APIを叩いて外部のデータを取得することも簡単に実現できます。これにより、さまざまなデータソースをシームレスに取り込むことが可能になります。

本記事では、dbt Python modelとSnowparkを用いた具体的なデータ取り込みの方法や、その仕組みについて紹介します。開発体験の良さを感じていただけると嬉しいです!

Snowparkとは

Snowparkは、Snowflakeが提供するフレームワークです。データエンジニアやデータサイエンティスト、データアナリストがSnowflake上でデータ処理や分析を行うために利用されます。SQLベースの操作に加え、Snowparkを利用することで、Python、Scala、Javaといったプログラミング言語を使って、Snowflake内で直接データ操作や分析処理のロジックを記述できます。

これにより、ユーザーは馴染みのある言語で複雑なデータパイプラインを構築し、大規模なデータセットのクレンジングや変換、機械学習モデルの予測を行うことが可能になります。

dbt Python modelとは

dbt Python modelは、dbtが持つ機能の1つでdbt modelをPythonを用いて記述できます。dbt Python modelは、dbt sourceや他のdbt modelを読み変換をし、変換をしたデータセットを返す関数です。データの操作はDataFrameで行われます。Snowflakeでdbt Python modelを利用する場合、pandas DataFrameとSnowparkのpandas互換DataFrameの両方を扱えます。

# models/python/my_python_model.py
def model(dbt, session):
        dbt.config(             # Jinjaのconfigマクロと同等の設定を行える
            materialized="table", # tableもしくはincrementalのみがサポートされている
            database="...",
            schema="..."
            alias="..."
            python_version="3.11",# 使用するPythonのバージョン
        )
        
        
    my_sql_model_df = dbt.ref("my_sql_model")
    final_df = ...  # 変換する処理を加える

    return final_df # 結果を返す

Pythonで関数でデータの変換処理を定義することや、PyPIにある外部パッケージも利用できます。

# models/python/my_python_model.py
import holidays

def is_holiday(date_col):
    # Chez Jaffle
    french_holidays = holidays.France()
    is_holiday = (date_col in french_holidays)
    return is_holiday

def model(dbt, session):
        dbt.config(              # Jinjaのconfigマクロと同等の設定を行える
            materialized="table",  # tableもしくはincrementalのみがサポートされている
            database="...",
            schema="..."
            alias="..."
            python_version="3.11", # 使用するPythonのバージョン
            packages = ["holidays"]# 使用するPyPIパッケージ
        )
        
    orders_df = dbt.ref("stg_orders")
    df = orders_df.to_pandas()

    df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday) # is_holiday関数を適用する
    return df

もちろん、dbt Python modelで定義したdbt modelは、 ref() を用いてSQL中から呼び出せます。

{{
    config(
        ...
    )
}}

SELECT *
FROM {{ ref('my_python_model') }} -- Pythonのファイル名がdbt modelの名前になる

dbt Python modelでは特定のデータプラットフォームのみがサポートされているので、利用する際は注意が必要です。dbt Python modelがサポートされているプラットフォームかは、dbtの公式ドキュメントを参考にしてください。

https://docs.getdbt.com/docs/supported-data-platforms

dbt Python modelとSnowparkの組み合わせ

dbt Python modelはSnowflakeで動かす場合、Snowparkを利用する必要があります。Pythonを用いてSnowflake上のdbt sourceやdbt modelを柔軟に操作し、データの変換や取得を行うことが可能になります。この組み合わせにより、Pythonで記述したロジックをSnowflakeの計算環境で直接実行でき、SQLの標準機能を超えた複雑なデータ処理が実現します。

Snowparkにはユーザー定義関数(UDF)の機能があり、これを利用することで、Snowflake内でPythonコードを用いて高度なデータ処理を実行できます。UDFを使用することで、複雑なデータ変換や高度な数学的処理、さらには機械学習モデルの統合など、SQLクエリ内での多様な処理が可能になります。さらに、UDFはdbt Python modelからも利用できるため、データの変換だけでなく、Snowflake上に存在しないデータを取得し、必要に応じて複雑な関数をDataFrame操作することができます。

以下の例では、UDFとdbt Python modelを用いてランダムな数値を生成し、テーブルとしてマテリアライズします。

import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import numpy

# UDFを登録する。@udfデコレーターと同様の処理をする。注意: 登録してもSnowflake上に関数などはできない。
def register_udf_add_random():
    # udfを定義する
    add_random = F.udf(
        lambda: numpy.random.normal(),  # 入力を必要とせず、ランダムな値を生成
        return_type=T.FloatType()
    )
    return add_random

def model(dbt, session):

    dbt.config(
        materialized = "table",
        database="workspace",
        schema="sample",
        alias="random_values_udf",
        packages = ["numpy"]
    )

    add_random = register_udf_add_random()

    # 新しいDataFrameを作成
    num_rows = 1000  # 生成する行数
    df = session.range(num_rows).select(
        F.col("id").as_("row_id"), 
        add_random().as_("random_value")
    )
    return df

dbtを実行すると以下の画像のように、データが入ります。

この例を元に、dbt Python modelとUDFがどのようにSnowflake上で作用しているかを理解するために、実際にSnowflake上で実行されるクエリを見ます。クエリを見ると、dbt Python modelで定義した関数が、 PROCEDURE として実行されていることがわかります。エントリーポイントはmain関数です。main関数では、dbtObjをインスタンス化されます。

dbtObjは、dbt Python model内で使用される特別なオブジェクトで、dbtの機能をPythonコードで利用できるようにするためのラッパークラスです。このオブジェクトは、以下のようなdbtの主要な機能にアクセスする手段を提供します。

  • ref: 他のdbt modelを参照するために使用され、指定されたモデルからデータを取得します。
  • source: dbt sourceを参照するために使用され、指定されたデータソースからデータを取得します。
  • config: モデルの設定(マテリアライズの種類、使用するパッケージなど)を取得したり設定したりします。
  • this: 現在のモデルが作成されるデータベース、スキーマ、テーブル名を取得します。

このdbtObjを使うことで、Pythonコード内でSQLモデルのように他のdbtリソースを参照したり、設定を取得したりすることが可能になります。

dbtObjをインスタンス化した後、model関数が実行され、DataFrameが取得されます。このDataFrameは、dbtObjを使って取得したデータを基に作成され、変換処理が行われます。最後に、materialize関数で、このデータフレームがSnowflake上にテーブルとしてマテリアライズ化されます。

Snowflakeで実際に実行されるクエリ

WITH test_udf__dbt_sp AS PROCEDURE ()

RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('numpy', 'snowflake-snowpark-python')

HANDLER = 'main'
EXECUTE AS CALLER
AS
$$

  
    
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import numpy

def register_udf_add_random():
    # UDFを定義
    add_random = F.udf(
        lambda: numpy.random.normal(),  # 入力を必要とせず、ランダムな値を生成
        return_type=T.FloatType()
    )
    return add_random

def model(dbt, session):

    dbt.config(
        materialized = "table",
        database="workspace",
        schema="sample",
        alias="random_values_udf",
        packages = ["numpy"]
    )

    add_random = register_udf_add_random()

    # 新しいDataFrameを作成
    num_rows = 1000  # 生成する行数
    df = session.range(num_rows).select(
        F.col("id").as_("row_id"), 
        add_random().as_("random_value")
    )

    return df

# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args, **kwargs):
    refs = {}
    key = '.'.join(args)
    version = kwargs.get("v") or kwargs.get("version")
    if version:
        key += f".v{version}"
    dbt_load_df_function = kwargs.get("dbt_load_df_function")
    return dbt_load_df_function(refs[key])

def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])

config_dict = {}

class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "workspace"
    schema = "sample"
    identifier = "random_values_udf"
    
    def __repr__(self):
        return 'workspace.sample.random_values_udf'

class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------

# To run this in snowsight, you need to select entry point to be main
# And you may have to modify the return type to text to get the result back
# def main(session):
#     dbt = dbtObj(session.table)
#     df = model(dbt, session)
#     return df.collect()

# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python
# then you can do the following to run model
# dbt = dbtObj(session.table)
# df = model(dbt, session)

def materialize(session, df, target_relation):
    # make sure pandas exists
    import importlib.util
    package_name = 'pandas'
    if importlib.util.find_spec(package_name):
        import pandas
        if isinstance(df, pandas.core.frame.DataFrame):
          session.use_database(target_relation.database)
          session.use_schema(target_relation.schema)
          # session.write_pandas does not have overwrite function
          df = session.createDataFrame(df)
    
    df.write.mode("overwrite").save_as_table('workspace.sample.random_values_udf', table_type='')

def main(session):
    dbt = dbtObj(session.table)
    df = model(dbt, session)
    materialize(session, df, dbt.this)
    return "OK"

  
$$
CALL test_udf__dbt_sp();

また、UDFとは異なるUDTF(User Defined Table Function)があります。UDFとUDTFの違いは返すデータのフォーマットとPythonクラスで定義できる点です。UDFは行を返すのに対し、UDTFは行と列つまりテーブルを返します。UDTFでは、UDFと異なり出力のスキーマを定義する必要があります。また、先ほどのUDFはlambda関数を用いて定義していましたが、UDTFはprocess関数を持つクラスで処理を定義することができます。先ほどの例をUDTF化した例を以下に示します。

import snowflake.snowpark.types as T
from snowflake.snowpark.functions import lit
import numpy

def model(dbt, session):
    dbt.config(
        materialized = "table",
        database="workspace",
        schema="sample",
        alias="random_values_udtf",
        packages = ["numpy", "snowflake-snowpark-python"]
    )

        # UDTFを定義
    class RandomValueGenerator:
        def __init__(self):
            self.add_random = lambda: numpy.random.normal()
        
        # processメソッドでデータを生成して返す
        def process(self, num_rows: int):
            for i in range(num_rows):
                yield (i, self.add_random())

        # UDTFの名前。配列の各要素はdatabase, schema, nameを意味する
    UDTF_NAME = ["landing_dbt_snowpark", "test", "test_udtf"]
    # UDTFを登録する。@udtfデコレーターと同様の処理をする。注意: 登録してもSnowflake上に関数などはできない。
    session.udtf.register(
        RandomValueGenerator,
        name=UDTF_NAME,
        session=session,
        output_schema=T.StructType([
            T.StructField("row_id", T.IntegerType()),
            T.StructField("random_value", T.FloatType())
        ]),
        packages=["numpy", "snowflake-snowpark-python"],
        replace=True
    )
    # UDTFを呼び出して新しいDataFrameを作成
    num_rows = 1000  # 生成する行数
    df = session.table_function(UDTF_NAME, lit(num_rows)) # UDTFを呼ぶ。litは引数をprocess関数に値を渡すために必要。
    return df

実際に外部APIからデータを取得する

今回は仮想の祝日情報提供サイト、「祝日ナビ」に関するデータをAPI経由で取得します。Snowflakeで外部のAPIをコールするためには以下のリソースが必要です。

  • ネットワークルール
  • シークレット(Optional)
  • 外部アクセス統合

それぞれのリソースを作成する意味とSQLステートメントは以下の通りです。

ネットワークルール

Snowflakeが外部APIにアクセスするためには、ネットワークポリシーの設定が必要です。具体的には、外部アクセスを許可するためのネットワークルールを定義する必要があります。これにより、Snowflakeが外部のAPIエンドポイントに対してHTTPリクエストを送信できるようになります。

create or replace network rule workspace.shukujitsu_navi.api_rule
mode = egress
type = host_port
value_list = ('api.example.com');

シークレット

外部APIにアクセスするためには、APIキーや認証情報などの機密情報(シークレット)を安全に管理する必要があります。これには、Snowflakeのシークレット機能を利用します。たとえば、APIキーを暗号化して保存し、クエリ実行時に動的に解読して利用することができます。

create or replace secret workspace.shukujitsu_navi.api_key
type = GENERIC_STRING
SECRET_STRING = '<your api key>';

外部アクセス統合

Snowflakeで外部APIにアクセスするための準備として、外部アクセス統合の設定が必要です。これにより、Snowflakeが指定されたエンドポイントに対してリクエストを送信し、レスポンスを受け取るための権限が与えられます。外部アクセス統合には、HTTPプロトコルやHTTPSプロトコルを使用して、外部のWebサービスやAPIにアクセスする設定が含まれます。

-- Account Adminで実行する
create or replace external access integration shukujitsu_navi_api
allowed_network_rules = (workspace.shukujitsu_navi.api_rule)
allowed_authentication_secrets = (workspace.shukujitsu_navi.api_key)
enabled = true;

祝日ナビのAPIからデータを取得するのに必要なリソースが全て整いました。実際にdbt Python modelを用いて祝日ナビの祝日データを取得し、テーブル化します。以下はdbt Python modelのコードになります。

# models/workspace__shukujitsu_navi__holidays.py
from snowflake.snowpark.functions import udtf
from snowflake.snowpark.types import StructType, StructField, StringType, VariantType
from snowflake.snowpark.functions import lit

def model(dbt, session):
    dbt.config(
        materialized="table",
        database="workspace",
        schema="shukujitsu_navi",
        alias="holidays",
        packages = ["requests", "snowflake-snowpark-python"],
        tags=["snowpark", "materialized:table:daily"]
    )

    import _snowflake
    import requests
    class SyukujitsuNaviFetcher:
        """
        祝日ナビのAPI経由で祝日データを取得するクラス。
        """

        def __init__(self):
            """
            SyukujitsuNaviFetcherのインスタンスを初期化し、APIヘッダーを設定する。
            """
            self.headers = self._get_headers()

        def _get_headers(self):
            """
            APIリクエストに必要なヘッダーを生成する。

            Returns:
                dict: APIリクエストヘッダー
            """
            api_key = _snowflake.get_generic_secret_string("api_key") # api_keyを取得する
            return {
                "Authorization": f"Token {api_key}",
                "Content-Type": "application/json"
            }

        def process(self, year: str, from_date: str, to_date: str):
            """
            指定された期間の祝日データを取得し、処理する。日付を指定しない場合は、全ての祝日データを取得する。

            Args:
                year (str): 取得する年
                from_date (str): 開始日
                to_date (str): 終了日

            Returns:
                list: 処理された祝日データのリスト
            """
            url = f"https://api.example.com/v1/holidays"
            response_data = self._get_holidays(url, year, from_date, to_date)

            data = [(row,) for row in response_data['data']]
            return data

        def _get_holidays(self, url: str, year: str, from_date: str, to_date: str):
            """
            祝日ナビのAPIから祝日データを取得する。

            Args:
                url (str): APIエンドポイントURL
                year (str): 取得する年
                from_date (str): 開始日
                to_date (str): 終了日

            Returns:
                dict: APIレスポンスのJSONデータ

            Raises:
                requests.HTTPError: APIリクエストが失敗した場合
            """
            params = {}
            if year:
                params["year"] = year
            if from_date:
                params["from"] = from_date
            if to_date:
                params["to"] = to_date
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()

    UDTF_NAME = ["workspace", "syukujitsu_navi", "holidays"]
    session.udtf.register(
        SyukujitsuNaviFetcher,
        name=UDTF_NAME,
        session=session,
        output_schema=StructType([StructField("value", VariantType())]), # Variant型として出力することを明示する
        input_types=[StringType(), StringType(), StringType()], # 3つのStringの引数を受け取ることを明示する
        packages=["requests", "snowflake-snowpark-python"], 
        external_access_integrations=["SYUKUJITSU_NAVI_API"], # 外部アクセス統合にSYUKUJITSU_NAVI_APIを使用することを明示する
        secrets={"api_key": "WORKSPACE.SYUKUJITSU_NAVI.API_KEY"}, # シークレットとしてWORKSPACE.SYUKUJITSU_NAVI.API_KEYを使用し、コード内からはapi_keyとして読めるように明示する
        replace=True
    )
    df1 = session.table_function(UDTF_NAME, lit(""), lit(""), lit("")) # 全ての引数を空文字で関数を呼ぶ
    return df1

上記のdbt Python modelを実行することにより、Snowflake上に祝日ナビの祝日に関するデータがテーブルとして作成されます。コード内で、先ほど作成した外部アクセス統合やシークレットを利用していることがわかります。より分かりやすくるためににdbt modelを実行し、祝日ナビのデータを取得し、テーブル化するまでの流れを示す図を用意しました。

さらに、このモデルを定期的に実行することで、祝日データを常に最新の状態に保つことが可能です。

まとめ

dbt Python modelとSnowparkを用いて、外部APIからデータを取得する方法について紹介をしました。dbt Python modelは、SQLを補完する強力なツールです。手続き的な処理を書ける点や、外部統合アクセスを用いてAPIを簡単に叩ける点など、Pythonの柔軟性を活かせます。また、他のdbt modelとシームレスにつなげられるので、特別な準備を必要とせず、既存のワークフローに自然に組み込めるのが大きな魅力です。SQLとPythonを組み合わせることで、より高度で複雑なデータ操作が可能になり、データ変換の可能性と開発体験を向上させることができました。

参考