LayerX エンジニアブログ

LayerX の エンジニアブログです。

データカタログにConnected SheetsやLooker Studioの情報を取り込んでレポートのデータソースを追跡する

はじめに

こんにちは!バクラク事業部 機械学習・データ部 データチームの@TrsNiumです。

弊社では、データの意味やデータの質、データの利活用を一元的に管理することを目的として、データカタログソリューションの一種であるOpenMetadataを導入しました。OpenMetadataを利用することで、様々な種類のデータベースやBI、CRMと連携し、データの管理と可視化を効率化しています。

弊社では主にBIツールとしてLooker Studioを使用しています。また、Google SheetsはConnected Sheetsの機能を使い、BigQuery上に構築されたデータ基盤のデータを用いて簡易的にデータ分析や可視化を行うツールとして利用しています。しかし、これらのツールはOpenMetadataのビルトイン機能ではサポートされていませんでした。そのため、データ変更時の影響範囲の把握や管理が課題となっていました。

今回は、OpenMetadataのカスタムコネクタを使って、Connected SheetsとLooker Studioとの連携方法についてご紹介します。OpenMetadataの導入背景やインフラ構成については、以前登壇した資料もご参照ください。 speakerdeck.com

OpenMetadataのカスタムコネクタについて

OpenMetadataには独自のコネクタを作成するための柔軟なフレームワークが提供されています。このカスタムコネクタはPythonで実装することができ、OpenMetadataから提供されているスキーマに合わせれば、どのようなデータでも取り込むことが可能です。このカスタムコネクタを使用することで、Connected SheetsやLooker Studioなどのツールとも連携が可能になります。

弊社では、Google SheetsやLooker Studioとの連携を実現するために、既に実装されているdbt(data build tool)1のコネクタを拡張する形で実装しています。dbtを利用することにより、dbtモデルとのリネージを正確に把握することが可能です。次のセクションでは、dbtを用いてConnected SheetsとLooker Studioレポートを管理する方法について紹介します。

dbtを用いてConnected SheetsとLooker Studioレポートを管理する

dbtは、データの変換とモデリングを効率化するためのオープンソースツールです。dbtを使用することで、データエンジニアやアナリストはSQLを使ってデータ変換のコードを書き、それをバージョン管理し、テストし、自動的にドキュメント化することができます。

dbtの一つの機能であるexposuresは、dbtのプロジェクト内で定義したデータモデルと、dbtのデータモデルとして定義するのが難しい下流工程(ダッシュボードやレポートなど)の依存関係を定義する機能です。このexposures機能を利用してGoogle SheetsやLooker Studioレポートとdbtのデータモデルの依存関係を定義しようと考えました。

例えば、exposuresでは以下のように定義を作成できます。

exposures:
  - name: example_report
    type: report
    owner:
      email: example@example.com
    depends_on:
      - ref('example_model')
    description: "This is an example report."
    url: "https://lookerstudio.google.com/reporting/example_report"

このような定義を一つ一つ作成するのは手間がかかり、非効率です。特に、数多くのレポートやシートがある場合、それぞれに対して同様の定義を行うのは現実的ではありません。

そこで、弊社ではConnected SheetsやLooker Studioレポートをdbt exposuresに登録する作業を自動化しています。次のセクションでは、dbt exposuresを自動で作成する方法について紹介します。

監査ログとBigQuery InformationSchemaを用いたdbt exposureの自動生成

Connected SheetsやLooker Studioレポートをdbt exposuresを用いて管理する取り組みは、以下のブログを参考にさせていただきました。

www.yasuhisay.info

Connected SheetsやLooker StudioレポートのデータソースはBigQueryであるため、BigQueryのInformation SchemaやGCPの監査ログを見ることで、誰がどのようなクエリを発行したかを確認することができます。例えばLooker Studioレポートの場合、以下のようなクエリを書くことにより、過去5日間で一度でも開かれたLooker StudioレポートIDと、そのレポートが参照しているビューとテーブルをInformation SchemaとGCPの監査ログから取得できます。

WITH
    -- Looker Studioの監査ログから過去5日間にアクセスされたレポートIDを取得
    looker_stuido_audit_logs AS (
    SELECT
        creation_time,
        user_email,
        label,
        label.value AS report_id,
        job_id
    FROM
        `region-<YOUR_REGION>INFORMATION_SCHEMA.JOBS_BY_PROJECT`,
        UNNEST(labels) AS label
    WHERE
        creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 DAY)
        AND label.key = "looker_studio_report_id"
        AND label.value != "" ),
    -- レポートIDごとに最新の所有者を取得
    report_owner AS (
        SELECT
            report_id,
            user_email AS report_owner,
        FROM
            looker_stuido_audit_logs QUALIFY ROW_NUMBER() OVER (PARTITION BY report_id ORDER BY creation_time DESC) = 1 ),
    -- LookerStudioレポートが参照しているビューを取得
    referenced_views AS(
        SELECT
            protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id,
            referenced_views.tableid AS table_id,
            referenced_views.datasetid AS dataset_id,
            referenced_views.projectid AS project_id
        FROM
            `<YOUR_GCP_AUDIT_LOG_LOCATION>.cloudaudit_googleapis_com_data_access`,
            UNNEST(protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedViews) AS referenced_views
        WHERE
            protopayload_auditlog.serviceName = "bigquery.googleapis.com"
            AND protopayload_auditlog.methodName = "jobservice.jobcompleted"
            AND TIMESTAMP_TRUNC(timestamp, DAY) >= CURRENT_TIMESTAMP() - INTERVAL 5 day ),
    -- LookerStudioレポートが参照しているテーブルを取得
    referenced_tables AS(
        SELECT
            protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id,
            referenced_tables.tableid AS table_id,
            referenced_tables.datasetid AS dataset_id,
            referenced_tables.projectid AS project_id
        FROM
            `<YOUR_GCP_AUDIT_LOG_LOCATION>.cloudaudit_googleapis_com_data_access`,
            UNNEST(protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedTables) AS referenced_tables
        WHERE
            protopayload_auditlog.serviceName = "bigquery.googleapis.com"
            AND protopayload_auditlog.methodName = "jobservice.jobcompleted"
            AND TIMESTAMP_TRUNC(timestamp, DAY) >= CURRENT_TIMESTAMP() - INTERVAL 5 day ),
    referenced_tables_by_job_id AS(
        SELECT * FROM referenced_views
            UNION ALL
        SELECT * FROM referenced_tables
    )
-- 参照されたレポートとその依存関係を取得
SELECT
    DISTINCT looker_stuido_audit_logs.report_id AS id,
    "https://lookerstudio.google.com/reporting/" || looker_stuido_audit_logs.report_id AS url,
    report_owner.report_owner as owner,
    project_id,
    dataset_id,
    table_id
FROM
    looker_stuido_audit_logs
INNER JOIN
    report_owner
USING
    (report_id)
INNER JOIN
    referenced_tables_by_job_id
USING
    (job_id)
  • looker_stuido_audit_logs サブクエリは、Looker Studioの監査ログから過去5日間にアクセスされたレポートIDを取得します。
  • report_owner サブクエリは、レポートIDごとに最新の所有者を取得します。
  • referenced_views サブクエリは、GCPの監査ログからビューを参照しているクエリを取得します。cloudaudit_googleapis_com_data_access はGCPの監査ログをBigQueryにエクスポートしたテーブルです。
  • referenced_tables サブクエリは、GCPの監査ログからテーブルを参照しているクエリを取得します。
  • referenced_tables_by_job_id サブクエリは、ビューとテーブルを参照しているクエリを統合します。
  • 最後のSELECTステートメントは、参照されたレポートとその依存関係を取得します。

このクエリからLooker StudioレポートIDと、そのレポートが参照するビューおよびテーブルを取得できます。

id url owner project_id dataset_id table_id
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_1
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_2
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_3
0a908902-935b-45fb-b526-a3d75f21fc11 https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11 owner@example.com project_id_2 dataset_id_2 table_id_2

このクエリ結果を、dbt exposuresのスキーマに合うようにYAMLをPythonで生成します。Connected Sheetsも同様に監査ログなどを使用して同様の結果を取得することができます。

exposures:
  - name: b05977a7-6f09-4906-a732-a1a0d84700cd
    type: report
    owner:
      email: owner@example.com
    depends_on:
      - ref('table_id_1')
      - ref('table_id_2')
      - ref('table_id_3')
    description: ""
    url: "https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd"
  - name: 0a908902-935b-45fb-b526-a3d75f21fc11
    type: report
    owner:
      email: owner@example.com
    depends_on:
      - ref('table_id_2')
    description: ""
    url: "https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11"

ただし、このままだと各種IDしか取得できないため、実際にURLを開くまでこのシートやレポートが何の目的で利用されているのかがわかりません。そこで弊社では、Google Workspaceの監査ログ2を用いて人間に優しいフォーマットにしています。Google Workspaceの監査ログはメールやカレンダーなどのプライベートな情報が含まれるため、BigQuery上で監査ログの対象を絞ったビューを使いクエリをしています。以下の例ではConnected SheetsとLooker Studioの監査ログは activity というテーブルにある前提で書きます。

Connected Sheets: Google Driveの監査ログを見ることで、シートのIDとシート名を取得します。例えば、以下のクエリを使用してシートIDとシート名を取得できます。

    sheets_name AS (
        SELECT
            drive.doc_id as sheets_id,
            drive.doc_title as name,
        FROM 
            `<GOOGLE_WORKSPACE_AUDIOT_LOGS_ACTIVITY>.activity`
        WHERE 
            event_timestamp > current_timestamp() - interval 5 day
            and record_type = 'drive'
            and drive.doc_type = 'spreadsheet'),

Looker Studio: Looker Studioの監査ログを見ることで、レポートIDとレポート名を取得します。以下はそのクエリ例です。

    report_name AS(
        SELECT
            data_studio.asset_id as report_id,
            data_studio.asset_name as name,
        FROM
            `<YOUR_GOOGLE_WORKSPACE_AUDIT_LOG>.activity`
        WHERE
            event_timestamp > current_timestamp() - interval 5 day
            and record_type = 'data_studio'
            and event_Type = 'ACCESS'),

これらの情報を前述したクエリと突合することにより、各種のリソース名を取得できます。

id url owner project_id dataset_id table_id name
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_1 Sales Dashboard
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_2 Sales Dashboard
b05977a7-6f09-4906-a732-a1a0d84700cd https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd owner@example.com project_id_1 dataset_id_1 table_id_3 Sales Dashboard
0a908902-935b-45fb-b526-a3d75f21fc11 https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11 owner@example.com project_id_2 dataset_id_2 table_id_2 Marketing Report

上記のクエリ結果と合わせて以下のようなフォーマットに整えることができます。

exposures:
  - name: b05977a7-6f09-4906-a732-a1a0d84700cd
    label: "Sales Dashboard"
    type: report
    owner:
      email: owner@example.com
    depends_on:
    - ref('table_id_1')
    - ref('table_id_2')
    - ref('table_id_3')
    url: "https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd"
  - name: 0a908902-935b-45fb-b526-a3d75f21fc11
    label: "Marketing Report"
    type: report
    owner:
      email: owner@example.com
    depends_on:
    - ref('table_id_2')
    url: "https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11"

ここまででdbt exposuresの生成について紹介しました。次のセクションではdbt exposuresをOpenMetadataに取り込む方法について紹介します。

dbt exposuresの情報をOpenMetadataに取り込む

OpenMetadataのカスタムコネクタの実装は以下のドキュメントを参考にしています。 docs.open-metadata.org

また、カスタムコネクタのサンプル実装もあるので、そちらも参考にしてみてください。 github.com

はじめにOpenMetadata上でサービスを作成します。今回のケースではダッシュボードとして、dbt exposuresを取り込みます。OpenMetadataのUIからSettingsServices -> Dashboardからカスタムダッシュボードを作成します。

次に、dbt exposuresの情報をOpenMetadataに取り込むために、カスタムコネクタをPythonで記述していきます。ここで、用語の定義と処理の流れを説明します。

用語の定義

ServiceTopology クラス: サービスのトポロジーを定義するクラス。このクラスは、データと処理の流れを定義する。

DbtExposureConnector クラス: dbt exposuresをOpenMetadataに取り込むためのカスタムコネクタクラス。このクラスは、ServiceTopology クラスを拡張し、dbt exposuresの処理を実装する。

一般的な処理の流れ

OpenMetadataのカスタムコネクタは、一般的に以下の流れで処理が行われます。

  1. 初期準備 (prepare) - コネクタの準備を行い、必要な設定を確認します。サービスや必要な設定の検証を行います。
  2. データの取得 (_iter) - _iter メソッドは、データを生成する関数です。この関数では、データの読み取りや変換を行い、CreateEntityRequest を生成して Sink に送ります。CreateEntityRequest は、新しいエンティティ(データの構造やプロパティを定義するオブジェクト)を作成するためのリクエストです。具体的には、データソース(データベースやファイルなど)からデータを読み取り、そのデータを基にエンティティリクエストを作成します。このエンティティリクエストは、後でシステムに送信され、実際のデータエンティティとして登録されます。
  3. エンティティの生成 - データを基にエンティティを生成します。 生成されたエンティティは Either クラスを使用します。

今回の処理の流れ

今回のカスタムコネクタでは、上記の一般的な処理の流れに基づき、以下のように処理が行われます。データの取得とエンティティの生成を同時に行なっています。

  1. prepare - dbtの準備を行います。dbtのmanifest.jsonなどをフェッチします。(中の実装は既存のdbtコネクタのものを利用しています。)
  2. get_dbt_files - dbtファイルを取得します。この段階でdbtファイルの検証も行います。(中の実装は既存のdbtコネクタのものを利用しています。)
  3. process_dbt_data_model - dbtデータモデルを処理します。この段階では、dbtデータモデルの解析とダッシュボードエンティティの作成が行われます。
  4. mark_dashboards_as_deleted - 不要なダッシュボードエンティを削除します。実行時以前で取り込んだダッシュボードエンティティのうち、不要なものを削除します。

前述した通りdbtコネクタを拡張し実装しています。既存の実装を利用することで、実装コストを下げ少ないコード量でdbt exposuresを取り込むことができます。

コードの詳細は長くなるため、以下に折りたたんで載せています。ご興味のある方は確認してみてください。

dbt exposures用のカスタムコネクタの実装

# connector/dbt_exposure_connector.py
# OpenMetadata Version 1.4.x
import traceback

from typing import Iterable, Optional, List, Set
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.generated.schema.api.services.createDashboardService import (
    CreateDashboardServiceRequest,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest

from metadata.generated.schema.entity.services.dashboardService import (
    DashboardService,
)
from metadata.generated.schema.entity.data.dashboard import (
    Dashboard
)
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.generated.schema.metadataIngestion.workflow import (
    Source as WorkflowSource,
)
from metadata.utils import fqn
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline
from metadata.ingestion.models.topology import (
    NodeStage,
    ServiceTopology,
    TopologyNode,
    TopologyContextManager
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.models import (
    DbtFiles,
    DbtObjects,
)
from metadata.ingestion.source.database.dbt.metadata import (
    DbtSource,
)
from metadata.ingestion.source.database.dbt.dbt_service import (
    DbtFiles,
    DbtObjects,
)
from metadata.generated.schema.entity.data.dashboard import (
    Dashboard,
)
from metadata.ingestion.source.database.dbt.dbt_utils import (
    check_ephemeral_node,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.api.delete import delete_entity_from_source


class DbtServiceTopology(ServiceTopology):
    """
    Defines the hierarchy in dbt Services.
    """
    root = TopologyNode(
        producer="get_services",
        stages=[
            NodeStage(
                type_=DashboardService,
                context="dashboard_service",
                processor="yield_create_request_dashboard_service",
                overwrite=False,
                must_return=True,
                cache_entities=True,
            ),
        ],
        children=["get_dbt_files"],
        post_process=["mark_dashboards_as_deleted"],
    )
    get_dbt_files = TopologyNode(
        producer="get_dbt_files",
        stages=[
            NodeStage(
                type_=DbtFiles,
                processor="validate_dbt_files",
                nullable=True,
            )
        ],
        children=["process_dbt_data_model"],
    )
    process_dbt_data_model = TopologyNode(
        producer="get_dbt_objects",
        stages=[
            NodeStage(
                type_=Dashboard,
                context="dashboard",
                processor="yield_dashboard",
                consumer=["dashboard_service"],
                use_cache=False,
            ),
            NodeStage(
                type_=AddLineageRequest,
                processor="yield_dashboard_lineage",
                consumer=["dashboard_service"],
                nullable=False,
            ),

        ],
    )


logger = ingestion_logger()


class InvalidServiceException(Exception):
    """
    The service passed in config is not found
    """

class DbtExposureConnector(DbtSource):
    topology = DbtServiceTopology()
    context = TopologyContextManager(topology)
    source_config: DbtPipeline
    dashboard_source_state: Set = set()


    """
    Class defines method to extract metadata from DBT
    """
        
    @classmethod
    def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None):
        config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
        return cls(config, metadata)

    def prepare(self):
        """
        By default for DBT nothing is required to be prepared
        """
        dashboard_service = self.metadata.get_by_name(
            entity=DashboardService, fqn=self.config.serviceName
        )
        if not dashboard_service:
            raise InvalidServiceException(
                f"Service with name {self.config.serviceName} not found"
            )

    def get_services(self) -> Iterable[WorkflowSource]:
        yield self.config


    def register_record(self, dashboard_request: CreateDashboardRequest) -> None:
        """
        Mark the dashboard record as scanned and update the dashboard_source_state
        """
        dashboard_fqn = fqn.build(
            self.metadata,
            entity_type=Dashboard,
            service_name=dashboard_request.service.__root__,
            dashboard_name=dashboard_request.name.__root__,
        )

        self.dashboard_source_state.add(dashboard_fqn)


    def remove_manifest_non_required_keys(self, manifest_dict: dict):
        pass


    def yield_create_request_dashboard_service(
        self, config: WorkflowSource
    ) -> Iterable[Either[CreateDashboardServiceRequest]]:
        yield Either(
            right=self.metadata.get_create_service_from_source(
                entity=DashboardService, config=config
            )
        )

    def yield_dashboard(
        self, dbt_objects: DbtObjects
    ) -> Iterable[Either[CreateDashboardRequest]]:
        """
        Yield the data models
        """
        if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest:
            logger.debug("Parsing DBT Data Models")
            for key, manifest_node in dbt_objects.dbt_manifest.exposures.items():
                try:
                    if (
                        manifest_node.resource_type.value != "exposure"
                    ):
                        continue

                    # Skip the ephemeral nodes since it is not materialized
                    if check_ephemeral_node(manifest_node):
                        logger.debug(f"Skipping ephemeral DBT node: {key}.")
                        continue

                    name = manifest_node.name
                    label = manifest_node.label
                    description = manifest_node.description
                    url = manifest_node.url
                    dashboard_request = CreateDashboardRequest(
                        name=name,
                        displayName=label,
                        description=description,
                        project="DBTExposure",
                        charts=[],
                        dataModels=[],
                        tags=[],
                        sourceUrl=url,
                        service=self.context.get().dashboard_service,
                    )
                    yield Either(right=dashboard_request)
                    self.register_record(dashboard_request=dashboard_request)
                except Exception as exc:
                    yield Either(
                        left=StackTraceError(
                            name=name,
                            error=f"Error to yield dashboard for {name}: {exc}",
                            stack_trace=traceback.format_exc(),
                        )
                    )

    def _get_dashbaord(self, dashboard_name: str) -> Optional[Dashboard]:
        """
        Get the datamodel entity for lineage
        """
        dashboard_fqn = fqn.build(
            self.metadata,
            entity_type=Dashboard,
            service_name=self.context.get().dashboard_service,
            dashboard_name=dashboard_name,
        )
        if dashboard_fqn:
            return self.metadata.get_by_name(
                entity=Dashboard,
                fqn=dashboard_fqn,
            )
        return None


    def yield_dashboard_lineage(
        self, dbt_objects: DbtObjects, db_service_name: str = "layerone"
    ) -> Iterable[Either[AddLineageRequest]]:
        """
            Lineage request between Data Models and Database table
        """
        db_service_entity = self.metadata.get_by_name(
            entity=DatabaseService, fqn=db_service_name
        )
        if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest:
            logger.debug("Parsing DBT Data Models")
            manifest_entities = {
                **dbt_objects.dbt_manifest.sources,
                **dbt_objects.dbt_manifest.nodes,
            }
            for key, manifest_node in dbt_objects.dbt_manifest.exposures.items():
                if (
                    manifest_node.resource_type.value != "exposure"
                ):
                    continue
            
                name = manifest_node.name
                to_entity = self._get_dashbaord(name)
                upstream = self.parse_upstream_nodes(manifest_entities, manifest_node)
                for upstream_node in upstream:
                    try:
                        from_es_result = self.metadata.es_search_from_fqn(
                            entity_type=Table,
                            fqn_search_string=upstream_node,
                        )
                        from_entity: Optional[
                            Union[Table, List[Table]]
                        ] = get_entity_from_es_result(
                            entity_list=from_es_result, fetch_multiple_entities=False
                        )
                        if from_entity and to_entity:
                            yield Either(
                                right=AddLineageRequest(
                                    edge=EntitiesEdge(
                                        fromEntity=EntityReference(
                                            id=from_entity.id.__root__,
                                            type="table",
                                        ),
                                        toEntity=EntityReference(
                                            id=to_entity.id.__root__,
                                            type="dashboard",
                                        ),
                                        lineageDetails=LineageDetails(
                                            source=LineageSource.DashboardLineage
                                        ),
                                    )
                                )
                            )
                    except Exception as exc:
                        yield Either(
                            left=StackTraceError(
                                name="dbtExposure",
                                error=(
                                    "Error to yield dashboard lineage details for DB "
                                    f"service name [dbtExposure]: {exc}"
                                ),
                                stack_trace=traceback.format_exc(),
                            )
                        )

    def mark_dashboards_as_deleted(self) -> Iterable[Either[DeleteEntity]]:
        """
        Method to mark the dashboards as deleted
        """
        logger.info("Mark Deleted Dashboards set to True")
        yield from delete_entity_from_source(
            metadata=self.metadata,
            entity_type=Dashboard,
            entity_source_state=self.dashboard_source_state,
            mark_deleted_entity=True,
            params={"service": self.context.get().dashboard_service},
        )

    def test_connection(self) -> None:
        pass

    def close(self):
        pass

次にYAMLでdbt exposuresを取り込むための設定を記述します。sourceConfigにdbtのmanifest.jsonをどのように取得するかを設定しますが、この部分はdbtコネクタのスキーマをそのまま利用できます。今回は例としてS3を使う設定にしています。

# ingestion_dbt_exposure.yaml
source:
  type: customDashboard
  serviceName: ${OM_DASHBOARD_SERVICE_NAME}
  serviceConnection:
    config:
      type: CustomDashboard
      sourcePythonClass: connector.dbt_exposure_connector.DbtExposureConnector
  sourceConfig:
    config:
      type: DBT
      dbtConfigSource:
        dbtConfigType: s3
        dbtSecurityConfig:
          awsRegion: ${AWS_REGION}
        dbtPrefixConfig:
          dbtBucketName: ${AWS_S3_BUCKET_NAME}
          dbtObjectPrefix: "${AWS_S3_OBJECT_PREFIX}"
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  loggerLevel: ${LOG_LEVEL}
  openMetadataServerConfig:
    hostPort: ${OPENMETADATA_INGESTION_URL}
    authProvider: openmetadata
    securityConfig:
      jwtToken: ${OPENMETADATA_INGESTION_JWT_TOKEN}

最後にmetadataコマンドを用いてdbt exposuresをOpenMetadataへ取り込みます。

$ metadata ingest -c ingestion_dbt_exposure.yaml

取り込みが完了すると、次のようにダッシュボードとしてdbt exposuresの一覧が表示されます。 dbt exposuresを取り込む際に依存関係も取り込んでいるため、ダッシュボードが依存するテーブルをLineageとして確認できます。

まとめ

今回のブログでは、OpenMetadataのカスタムコネクタを利用してdbt exposuresをConnected SheetsやLooker Studioレポートと連携させる方法について説明しました。連携をすることにより、データの管理と可視化が一元化され、データの利用状況を効率的に追跡できるようになりました。他にも弊社では、カスタムコネクタをSalesforce及びMySQLとBigQueryとのLineageの作成などにも利用しています。大変便利な機能なので、ぜひ利用してみてください!

弊社では、社員全員がアナリストになれるようなデータ分析環境を整備しています。もしご興味を持たれた方は、カジュアル面談からでも是非お話ししましょう! www.notion.so