はじめに
こんにちは!バクラク事業部 機械学習・データ部 データチームの@TrsNiumです。
弊社では、データの意味やデータの質、データの利活用を一元的に管理することを目的として、データカタログソリューションの一種であるOpenMetadataを導入しました。OpenMetadataを利用することで、様々な種類のデータベースやBI、CRMと連携し、データの管理と可視化を効率化しています。
弊社では主にBIツールとしてLooker Studioを使用しています。また、Google SheetsはConnected Sheetsの機能を使い、BigQuery上に構築されたデータ基盤のデータを用いて簡易的にデータ分析や可視化を行うツールとして利用しています。しかし、これらのツールはOpenMetadataのビルトイン機能ではサポートされていませんでした。そのため、データ変更時の影響範囲の把握や管理が課題となっていました。
今回は、OpenMetadataのカスタムコネクタを使って、Connected SheetsとLooker Studioとの連携方法についてご紹介します。OpenMetadataの導入背景やインフラ構成については、以前登壇した資料もご参照ください。 speakerdeck.com
- はじめに
- OpenMetadataのカスタムコネクタについて
- dbtを用いてConnected SheetsとLooker Studioレポートを管理する
- 監査ログとBigQuery InformationSchemaを用いたdbt exposureの自動生成
- dbt exposuresの情報をOpenMetadataに取り込む
- まとめ
OpenMetadataのカスタムコネクタについて
OpenMetadataには独自のコネクタを作成するための柔軟なフレームワークが提供されています。このカスタムコネクタはPythonで実装することができ、OpenMetadataから提供されているスキーマに合わせれば、どのようなデータでも取り込むことが可能です。このカスタムコネクタを使用することで、Connected SheetsやLooker Studioなどのツールとも連携が可能になります。
弊社では、Google SheetsやLooker Studioとの連携を実現するために、既に実装されているdbt(data build tool)1のコネクタを拡張する形で実装しています。dbtを利用することにより、dbtモデルとのリネージを正確に把握することが可能です。次のセクションでは、dbtを用いてConnected SheetsとLooker Studioレポートを管理する方法について紹介します。
dbtを用いてConnected SheetsとLooker Studioレポートを管理する
dbtは、データの変換とモデリングを効率化するためのオープンソースツールです。dbtを使用することで、データエンジニアやアナリストはSQLを使ってデータ変換のコードを書き、それをバージョン管理し、テストし、自動的にドキュメント化することができます。
dbtの一つの機能であるexposuresは、dbtのプロジェクト内で定義したデータモデルと、dbtのデータモデルとして定義するのが難しい下流工程(ダッシュボードやレポートなど)の依存関係を定義する機能です。このexposures機能を利用してGoogle SheetsやLooker Studioレポートとdbtのデータモデルの依存関係を定義しようと考えました。
例えば、exposuresでは以下のように定義を作成できます。
exposures: - name: example_report type: report owner: email: example@example.com depends_on: - ref('example_model') description: "This is an example report." url: "https://lookerstudio.google.com/reporting/example_report"
このような定義を一つ一つ作成するのは手間がかかり、非効率です。特に、数多くのレポートやシートがある場合、それぞれに対して同様の定義を行うのは現実的ではありません。
そこで、弊社ではConnected SheetsやLooker Studioレポートをdbt exposuresに登録する作業を自動化しています。次のセクションでは、dbt exposuresを自動で作成する方法について紹介します。
監査ログとBigQuery InformationSchemaを用いたdbt exposureの自動生成
Connected SheetsやLooker Studioレポートをdbt exposuresを用いて管理する取り組みは、以下のブログを参考にさせていただきました。
Connected SheetsやLooker StudioレポートのデータソースはBigQueryであるため、BigQueryのInformation SchemaやGCPの監査ログを見ることで、誰がどのようなクエリを発行したかを確認することができます。例えばLooker Studioレポートの場合、以下のようなクエリを書くことにより、過去5日間で一度でも開かれたLooker StudioレポートIDと、そのレポートが参照しているビューとテーブルをInformation SchemaとGCPの監査ログから取得できます。
WITH -- Looker Studioの監査ログから過去5日間にアクセスされたレポートIDを取得 looker_stuido_audit_logs AS ( SELECT creation_time, user_email, label, label.value AS report_id, job_id FROM `region-<YOUR_REGION>INFORMATION_SCHEMA.JOBS_BY_PROJECT`, UNNEST(labels) AS label WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 DAY) AND label.key = "looker_studio_report_id" AND label.value != "" ), -- レポートIDごとに最新の所有者を取得 report_owner AS ( SELECT report_id, user_email AS report_owner, FROM looker_stuido_audit_logs QUALIFY ROW_NUMBER() OVER (PARTITION BY report_id ORDER BY creation_time DESC) = 1 ), -- LookerStudioレポートが参照しているビューを取得 referenced_views AS( SELECT protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id, referenced_views.tableid AS table_id, referenced_views.datasetid AS dataset_id, referenced_views.projectid AS project_id FROM `<YOUR_GCP_AUDIT_LOG_LOCATION>.cloudaudit_googleapis_com_data_access`, UNNEST(protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedViews) AS referenced_views WHERE protopayload_auditlog.serviceName = "bigquery.googleapis.com" AND protopayload_auditlog.methodName = "jobservice.jobcompleted" AND TIMESTAMP_TRUNC(timestamp, DAY) >= CURRENT_TIMESTAMP() - INTERVAL 5 day ), -- LookerStudioレポートが参照しているテーブルを取得 referenced_tables AS( SELECT protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id, referenced_tables.tableid AS table_id, referenced_tables.datasetid AS dataset_id, referenced_tables.projectid AS project_id FROM `<YOUR_GCP_AUDIT_LOG_LOCATION>.cloudaudit_googleapis_com_data_access`, UNNEST(protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedTables) AS referenced_tables WHERE protopayload_auditlog.serviceName = "bigquery.googleapis.com" AND protopayload_auditlog.methodName = "jobservice.jobcompleted" AND TIMESTAMP_TRUNC(timestamp, DAY) >= CURRENT_TIMESTAMP() - INTERVAL 5 day ), referenced_tables_by_job_id AS( SELECT * FROM referenced_views UNION ALL SELECT * FROM referenced_tables ) -- 参照されたレポートとその依存関係を取得 SELECT DISTINCT looker_stuido_audit_logs.report_id AS id, "https://lookerstudio.google.com/reporting/" || looker_stuido_audit_logs.report_id AS url, report_owner.report_owner as owner, project_id, dataset_id, table_id FROM looker_stuido_audit_logs INNER JOIN report_owner USING (report_id) INNER JOIN referenced_tables_by_job_id USING (job_id)
- looker_stuido_audit_logs サブクエリは、Looker Studioの監査ログから過去5日間にアクセスされたレポートIDを取得します。
- report_owner サブクエリは、レポートIDごとに最新の所有者を取得します。
- referenced_views サブクエリは、GCPの監査ログからビューを参照しているクエリを取得します。
cloudaudit_googleapis_com_data_access
はGCPの監査ログをBigQueryにエクスポートしたテーブルです。 - referenced_tables サブクエリは、GCPの監査ログからテーブルを参照しているクエリを取得します。
- referenced_tables_by_job_id サブクエリは、ビューとテーブルを参照しているクエリを統合します。
- 最後のSELECTステートメントは、参照されたレポートとその依存関係を取得します。
このクエリからLooker StudioレポートIDと、そのレポートが参照するビューおよびテーブルを取得できます。
id | url | owner | project_id | dataset_id | table_id |
---|---|---|---|---|---|
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_1 |
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_2 |
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_3 |
0a908902-935b-45fb-b526-a3d75f21fc11 | https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11 | owner@example.com | project_id_2 | dataset_id_2 | table_id_2 |
このクエリ結果を、dbt exposuresのスキーマに合うようにYAMLをPythonで生成します。Connected Sheetsも同様に監査ログなどを使用して同様の結果を取得することができます。
exposures: - name: b05977a7-6f09-4906-a732-a1a0d84700cd type: report owner: email: owner@example.com depends_on: - ref('table_id_1') - ref('table_id_2') - ref('table_id_3') description: "" url: "https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd" - name: 0a908902-935b-45fb-b526-a3d75f21fc11 type: report owner: email: owner@example.com depends_on: - ref('table_id_2') description: "" url: "https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11"
ただし、このままだと各種IDしか取得できないため、実際にURLを開くまでこのシートやレポートが何の目的で利用されているのかがわかりません。そこで弊社では、Google Workspaceの監査ログ2を用いて人間に優しいフォーマットにしています。Google Workspaceの監査ログはメールやカレンダーなどのプライベートな情報が含まれるため、BigQuery上で監査ログの対象を絞ったビューを使いクエリをしています。以下の例ではConnected SheetsとLooker Studioの監査ログは activity
というテーブルにある前提で書きます。
Connected Sheets: Google Driveの監査ログを見ることで、シートのIDとシート名を取得します。例えば、以下のクエリを使用してシートIDとシート名を取得できます。
sheets_name AS ( SELECT drive.doc_id as sheets_id, drive.doc_title as name, FROM `<GOOGLE_WORKSPACE_AUDIOT_LOGS_ACTIVITY>.activity` WHERE event_timestamp > current_timestamp() - interval 5 day and record_type = 'drive' and drive.doc_type = 'spreadsheet'),
Looker Studio: Looker Studioの監査ログを見ることで、レポートIDとレポート名を取得します。以下はそのクエリ例です。
report_name AS( SELECT data_studio.asset_id as report_id, data_studio.asset_name as name, FROM `<YOUR_GOOGLE_WORKSPACE_AUDIT_LOG>.activity` WHERE event_timestamp > current_timestamp() - interval 5 day and record_type = 'data_studio' and event_Type = 'ACCESS'),
これらの情報を前述したクエリと突合することにより、各種のリソース名を取得できます。
id | url | owner | project_id | dataset_id | table_id | name |
---|---|---|---|---|---|---|
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_1 | Sales Dashboard |
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_2 | Sales Dashboard |
b05977a7-6f09-4906-a732-a1a0d84700cd | https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd | owner@example.com | project_id_1 | dataset_id_1 | table_id_3 | Sales Dashboard |
0a908902-935b-45fb-b526-a3d75f21fc11 | https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11 | owner@example.com | project_id_2 | dataset_id_2 | table_id_2 | Marketing Report |
上記のクエリ結果と合わせて以下のようなフォーマットに整えることができます。
exposures: - name: b05977a7-6f09-4906-a732-a1a0d84700cd label: "Sales Dashboard" type: report owner: email: owner@example.com depends_on: - ref('table_id_1') - ref('table_id_2') - ref('table_id_3') url: "https://lookerstudio.google.com/reporting/b05977a7-6f09-4906-a732-a1a0d84700cd" - name: 0a908902-935b-45fb-b526-a3d75f21fc11 label: "Marketing Report" type: report owner: email: owner@example.com depends_on: - ref('table_id_2') url: "https://lookerstudio.google.com/reporting/0a908902-935b-45fb-b526-a3d75f21fc11"
ここまででdbt exposuresの生成について紹介しました。次のセクションではdbt exposuresをOpenMetadataに取り込む方法について紹介します。
dbt exposuresの情報をOpenMetadataに取り込む
OpenMetadataのカスタムコネクタの実装は以下のドキュメントを参考にしています。 docs.open-metadata.org
また、カスタムコネクタのサンプル実装もあるので、そちらも参考にしてみてください。 github.com
はじめにOpenMetadata上でサービスを作成します。今回のケースではダッシュボードとして、dbt exposuresを取り込みます。OpenMetadataのUIからSettings → Services -> Dashboardからカスタムダッシュボードを作成します。
次に、dbt exposuresの情報をOpenMetadataに取り込むために、カスタムコネクタをPythonで記述していきます。ここで、用語の定義と処理の流れを説明します。
用語の定義
ServiceTopology クラス: サービスのトポロジーを定義するクラス。このクラスは、データと処理の流れを定義する。
DbtExposureConnector クラス: dbt exposuresをOpenMetadataに取り込むためのカスタムコネクタクラス。このクラスは、ServiceTopology クラスを拡張し、dbt exposuresの処理を実装する。
一般的な処理の流れ
OpenMetadataのカスタムコネクタは、一般的に以下の流れで処理が行われます。
- 初期準備 (prepare) - コネクタの準備を行い、必要な設定を確認します。サービスや必要な設定の検証を行います。
- データの取得 (_iter) -
_iter
メソッドは、データを生成する関数です。この関数では、データの読み取りや変換を行い、CreateEntityRequest
を生成してSink
に送ります。CreateEntityRequest
は、新しいエンティティ(データの構造やプロパティを定義するオブジェクト)を作成するためのリクエストです。具体的には、データソース(データベースやファイルなど)からデータを読み取り、そのデータを基にエンティティリクエストを作成します。このエンティティリクエストは、後でシステムに送信され、実際のデータエンティティとして登録されます。 - エンティティの生成 - データを基にエンティティを生成します。 生成されたエンティティは
Either
クラスを使用します。
今回の処理の流れ
今回のカスタムコネクタでは、上記の一般的な処理の流れに基づき、以下のように処理が行われます。データの取得とエンティティの生成を同時に行なっています。
- prepare - dbtの準備を行います。dbtのmanifest.jsonなどをフェッチします。(中の実装は既存のdbtコネクタのものを利用しています。)
- get_dbt_files - dbtファイルを取得します。この段階でdbtファイルの検証も行います。(中の実装は既存のdbtコネクタのものを利用しています。)
- process_dbt_data_model - dbtデータモデルを処理します。この段階では、dbtデータモデルの解析とダッシュボードエンティティの作成が行われます。
- mark_dashboards_as_deleted - 不要なダッシュボードエンティを削除します。実行時以前で取り込んだダッシュボードエンティティのうち、不要なものを削除します。
前述した通りdbtコネクタを拡張し実装しています。既存の実装を利用することで、実装コストを下げ少ないコード量でdbt exposuresを取り込むことができます。
コードの詳細は長くなるため、以下に折りたたんで載せています。ご興味のある方は確認してみてください。
dbt exposures用のカスタムコネクタの実装
# connector/dbt_exposure_connector.py # OpenMetadata Version 1.4.x import traceback from typing import Iterable, Optional, List, Set from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.ingestion.api.models import Either, StackTraceError from metadata.generated.schema.api.services.createDashboardService import ( CreateDashboardServiceRequest, ) from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.entity.services.dashboardService import ( DashboardService, ) from metadata.generated.schema.entity.data.dashboard import ( Dashboard ) from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.utils import fqn from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger from metadata.ingestion.source.database.dbt.metadata import DbtSource from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, TopologyNode, TopologyContextManager ) from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.dbt.models import ( DbtFiles, DbtObjects, ) from metadata.ingestion.source.database.dbt.metadata import ( DbtSource, ) from metadata.ingestion.source.database.dbt.dbt_service import ( DbtFiles, DbtObjects, ) from metadata.generated.schema.entity.data.dashboard import ( Dashboard, ) from metadata.ingestion.source.database.dbt.dbt_utils import ( check_ephemeral_node, ) from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.api.delete import delete_entity_from_source class DbtServiceTopology(ServiceTopology): """ Defines the hierarchy in dbt Services. """ root = TopologyNode( producer="get_services", stages=[ NodeStage( type_=DashboardService, context="dashboard_service", processor="yield_create_request_dashboard_service", overwrite=False, must_return=True, cache_entities=True, ), ], children=["get_dbt_files"], post_process=["mark_dashboards_as_deleted"], ) get_dbt_files = TopologyNode( producer="get_dbt_files", stages=[ NodeStage( type_=DbtFiles, processor="validate_dbt_files", nullable=True, ) ], children=["process_dbt_data_model"], ) process_dbt_data_model = TopologyNode( producer="get_dbt_objects", stages=[ NodeStage( type_=Dashboard, context="dashboard", processor="yield_dashboard", consumer=["dashboard_service"], use_cache=False, ), NodeStage( type_=AddLineageRequest, processor="yield_dashboard_lineage", consumer=["dashboard_service"], nullable=False, ), ], ) logger = ingestion_logger() class InvalidServiceException(Exception): """ The service passed in config is not found """ class DbtExposureConnector(DbtSource): topology = DbtServiceTopology() context = TopologyContextManager(topology) source_config: DbtPipeline dashboard_source_state: Set = set() """ Class defines method to extract metadata from DBT """ @classmethod def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) return cls(config, metadata) def prepare(self): """ By default for DBT nothing is required to be prepared """ dashboard_service = self.metadata.get_by_name( entity=DashboardService, fqn=self.config.serviceName ) if not dashboard_service: raise InvalidServiceException( f"Service with name {self.config.serviceName} not found" ) def get_services(self) -> Iterable[WorkflowSource]: yield self.config def register_record(self, dashboard_request: CreateDashboardRequest) -> None: """ Mark the dashboard record as scanned and update the dashboard_source_state """ dashboard_fqn = fqn.build( self.metadata, entity_type=Dashboard, service_name=dashboard_request.service.__root__, dashboard_name=dashboard_request.name.__root__, ) self.dashboard_source_state.add(dashboard_fqn) def remove_manifest_non_required_keys(self, manifest_dict: dict): pass def yield_create_request_dashboard_service( self, config: WorkflowSource ) -> Iterable[Either[CreateDashboardServiceRequest]]: yield Either( right=self.metadata.get_create_service_from_source( entity=DashboardService, config=config ) ) def yield_dashboard( self, dbt_objects: DbtObjects ) -> Iterable[Either[CreateDashboardRequest]]: """ Yield the data models """ if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest: logger.debug("Parsing DBT Data Models") for key, manifest_node in dbt_objects.dbt_manifest.exposures.items(): try: if ( manifest_node.resource_type.value != "exposure" ): continue # Skip the ephemeral nodes since it is not materialized if check_ephemeral_node(manifest_node): logger.debug(f"Skipping ephemeral DBT node: {key}.") continue name = manifest_node.name label = manifest_node.label description = manifest_node.description url = manifest_node.url dashboard_request = CreateDashboardRequest( name=name, displayName=label, description=description, project="DBTExposure", charts=[], dataModels=[], tags=[], sourceUrl=url, service=self.context.get().dashboard_service, ) yield Either(right=dashboard_request) self.register_record(dashboard_request=dashboard_request) except Exception as exc: yield Either( left=StackTraceError( name=name, error=f"Error to yield dashboard for {name}: {exc}", stack_trace=traceback.format_exc(), ) ) def _get_dashbaord(self, dashboard_name: str) -> Optional[Dashboard]: """ Get the datamodel entity for lineage """ dashboard_fqn = fqn.build( self.metadata, entity_type=Dashboard, service_name=self.context.get().dashboard_service, dashboard_name=dashboard_name, ) if dashboard_fqn: return self.metadata.get_by_name( entity=Dashboard, fqn=dashboard_fqn, ) return None def yield_dashboard_lineage( self, dbt_objects: DbtObjects, db_service_name: str = "layerone" ) -> Iterable[Either[AddLineageRequest]]: """ Lineage request between Data Models and Database table """ db_service_entity = self.metadata.get_by_name( entity=DatabaseService, fqn=db_service_name ) if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest: logger.debug("Parsing DBT Data Models") manifest_entities = { **dbt_objects.dbt_manifest.sources, **dbt_objects.dbt_manifest.nodes, } for key, manifest_node in dbt_objects.dbt_manifest.exposures.items(): if ( manifest_node.resource_type.value != "exposure" ): continue name = manifest_node.name to_entity = self._get_dashbaord(name) upstream = self.parse_upstream_nodes(manifest_entities, manifest_node) for upstream_node in upstream: try: from_es_result = self.metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=upstream_node, ) from_entity: Optional[ Union[Table, List[Table]] ] = get_entity_from_es_result( entity_list=from_es_result, fetch_multiple_entities=False ) if from_entity and to_entity: yield Either( right=AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference( id=from_entity.id.__root__, type="table", ), toEntity=EntityReference( id=to_entity.id.__root__, type="dashboard", ), lineageDetails=LineageDetails( source=LineageSource.DashboardLineage ), ) ) ) except Exception as exc: yield Either( left=StackTraceError( name="dbtExposure", error=( "Error to yield dashboard lineage details for DB " f"service name [dbtExposure]: {exc}" ), stack_trace=traceback.format_exc(), ) ) def mark_dashboards_as_deleted(self) -> Iterable[Either[DeleteEntity]]: """ Method to mark the dashboards as deleted """ logger.info("Mark Deleted Dashboards set to True") yield from delete_entity_from_source( metadata=self.metadata, entity_type=Dashboard, entity_source_state=self.dashboard_source_state, mark_deleted_entity=True, params={"service": self.context.get().dashboard_service}, ) def test_connection(self) -> None: pass def close(self): pass
次にYAMLでdbt exposuresを取り込むための設定を記述します。sourceConfigにdbtのmanifest.jsonをどのように取得するかを設定しますが、この部分はdbtコネクタのスキーマをそのまま利用できます。今回は例としてS3を使う設定にしています。
# ingestion_dbt_exposure.yaml source: type: customDashboard serviceName: ${OM_DASHBOARD_SERVICE_NAME} serviceConnection: config: type: CustomDashboard sourcePythonClass: connector.dbt_exposure_connector.DbtExposureConnector sourceConfig: config: type: DBT dbtConfigSource: dbtConfigType: s3 dbtSecurityConfig: awsRegion: ${AWS_REGION} dbtPrefixConfig: dbtBucketName: ${AWS_S3_BUCKET_NAME} dbtObjectPrefix: "${AWS_S3_OBJECT_PREFIX}" sink: type: metadata-rest config: {} workflowConfig: loggerLevel: ${LOG_LEVEL} openMetadataServerConfig: hostPort: ${OPENMETADATA_INGESTION_URL} authProvider: openmetadata securityConfig: jwtToken: ${OPENMETADATA_INGESTION_JWT_TOKEN}
最後にmetadataコマンドを用いてdbt exposuresをOpenMetadataへ取り込みます。
$ metadata ingest -c ingestion_dbt_exposure.yaml
取り込みが完了すると、次のようにダッシュボードとしてdbt exposuresの一覧が表示されます。 dbt exposuresを取り込む際に依存関係も取り込んでいるため、ダッシュボードが依存するテーブルをLineageとして確認できます。
まとめ
今回のブログでは、OpenMetadataのカスタムコネクタを利用してdbt exposuresをConnected SheetsやLooker Studioレポートと連携させる方法について説明しました。連携をすることにより、データの管理と可視化が一元化され、データの利用状況を効率的に追跡できるようになりました。他にも弊社では、カスタムコネクタをSalesforce及びMySQLとBigQueryとのLineageの作成などにも利用しています。大変便利な機能なので、ぜひ利用してみてください!
弊社では、社員全員がアナリストになれるようなデータ分析環境を整備しています。もしご興味を持たれた方は、カジュアル面談からでも是非お話ししましょう! www.notion.so