Lakeflow Connect を使用して SQL Server から Azure Databricks にデータを取り込む方法について説明します。
SQL Server コネクタは、Azure SQL Database、Azure SQL Managed Instance、および Amazon RDS SQL データベースをサポートしています。 これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。 このコネクタでは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートされています。
必要条件
インジェスト ゲートウェイとインジェスト パイプラインを作成するには、まず次の要件を満たす必要があります。
ワークスペースが Unity Catalog に対して有効になっている。
ワークスペースに対してサーバーレス コンピューティングが有効になっています。 サーバーレス コンピューティング要件を参照してください。
接続を作成する場合: メタストアに対する
CREATE CONNECTION特権があります。 「Unity Catalog の特権の管理」を参照してください。コネクタで UI ベースのパイプライン作成がサポートされている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン作成を使用する場合は、このページの手順を完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「マネージド インジェスト ソースへの接続」を参照してください。
既存の接続を使用する場合: 接続に
USE CONNECTION特権またはALL PRIVILEGESがあります。ターゲット カタログに対する
USE CATALOG権限があります。既存のスキーマに対する
USE SCHEMA、CREATE TABLE、およびCREATE VOLUME特権、またはターゲット カタログに対するCREATE SCHEMA特権があります。
プライマリ SQL Server インスタンスにアクセスできます。 変更の追跡と変更データ キャプチャ機能は、読み取りレプリカまたはセカンダリ インスタンスではサポートされていません。
クラスターを作成するための無制限のアクセス許可、またはカスタム ポリシー (API のみ)。 ゲートウェイのカスタム ポリシーは、次の要件を満たしている必要があります。
ファミリ: ジョブ コンピューティング
ポリシー ファミリのオーバーライド:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }Databricks では、インジェスト ゲートウェイのワーカー ノードはゲートウェイのパフォーマンスに影響しないため、可能な限り最小のワーカー ノードを指定することをお勧めします。 次のコンピューティング ポリシーを使用すると、Azure Databricks は、ワークロードのニーズに合わせてインジェスト ゲートウェイをスケーリングできます。 ソース データベースからの効率的でパフォーマンスの高いデータ抽出を可能にする最小要件は 8 コアです。
{ "driver_node_type_id": { "type": "fixed", "value": "Standard_E64d_v4" }, "node_type_id": { "type": "fixed", "value": "Standard_F4s" } }
クラスター ポリシーの詳細については、「 コンピューティング ポリシーの選択」を参照してください。
SQL Server から取り込むには、「 Azure Databricks への取り込み用に Microsoft SQL Server を構成する」の手順を最初に完了する必要があります。
ゲートウェイとインジェスト パイプラインを作成する
Databricks ユーザーインターフェース
Azure Databricks ワークスペースのサイドバーで、[ Data Ingestion をクリックします。
[ データの追加 ] ページの [ Databricks コネクタ] で、[ SQL Server] をクリックします。
インジェスト ウィザードの [ 接続 ] ページで、[ Azure Databricks への取り込み用に Microsoft SQL Server を構成する] から SQL Server アクセス資格情報を格納する接続を選択します。 メタストアに対する
CREATE CONNECTION権限がある場合は、[プラス] アイコンをクリックできます接続を作成 して、 SQL Server の認証の詳細を含む新しい接続を作成します。
[次へ] をクリックします。
[ インジェストのセットアップ ] ページで、インジェスト パイプラインの一意の名前を入力します。 このパイプラインは、ステージングの場所から宛先にデータを移動します。
イベント ログを書き込むカタログとスキーマを選択します。 イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、エラーが含まれます。 カタログに対する
USE CATALOG権限とCREATE SCHEMA権限がある場合は、[プラス] アイコンをクリックできますドロップダウン メニューでスキーマを作成し、新しいスキーマを作成します。
(省略可能) すべてのテーブルの自動更新を[オン] に設定します。 自動更新がオンの場合、パイプラインは、影響を受けるテーブルを完全に更新することで、ログ クリーンアップ イベントや特定の種類のスキーマの進化などの問題を自動的に修正しようとします。 履歴の追跡が有効になっている場合、完全な更新でその履歴が消去されます。
インジェスト ゲートウェイの一意の名前を入力します。 ゲートウェイは、ソースから変更を抽出し、インジェスト パイプラインが読み込まれるようステージングするパイプラインです。
ステージング場所のカタログとスキーマを選択します。 この場所にボリュームが作成され、抽出されたデータがステージングされます。 カタログに対する
USE CATALOG権限とCREATE SCHEMA権限がある場合は、[プラス] アイコンをクリックできますドロップダウン メニューでスキーマを作成し、新しいスキーマを作成します。
[パイプラインの作成] をクリックして続行します。
[ ソース ] ページで、取り込むテーブルを選択します。 特定のテーブルを選択した場合は、テーブル設定を構成できます。
a. (省略可能)[ 設定] タブで、取り込まれた各テーブルの 宛先名 を指定します。 これは、オブジェクトを同じスキーマに複数回取り込むときに、変換先テーブルを区別するのに役立ちます。 「 宛先テーブルに名前を付けます」を参照してください。
a. (省略可能)既定の 履歴追跡 設定を変更します。 「 履歴追跡の有効化 (SCD タイプ 2)」を参照してください。
[ 次へ] をクリックし、[ 保存] をクリックして続行します。
[ 宛先 ] ページで、データを読み込むカタログとスキーマを選択します。 カタログに対する
USE CATALOG権限とCREATE SCHEMA権限がある場合は、[プラス] アイコンをクリックできますドロップダウン メニューでスキーマを作成し、新しいスキーマを作成します。
[ 保存] をクリックして続行します。
[ データベースのセットアップ ] ページで、[ 検証 ] をクリックして、ソースが Azure Databricks インジェスト用に正しく構成されていることを確認します。 不足している構成が返されます。 解決する手順については、[ 構成の完了] をクリックします。 続けて、 [次へ] をクリックします。 または、[検証の スキップ] をクリックします。
(省略可能)[ スケジュールと通知 ] ページで、[プラス] アイコンをクリック
スケジュールを作成します。 変換先テーブルを更新する頻度を設定します。
(省略可能)[
パイプライン 操作の成功または失敗の電子メール通知を設定する通知を追加し、[ パイプラインの保存と実行] をクリックします。
宣言型オートメーション バンドル
宣言型オートメーション バンドルを使用して取り込む前に、既存の接続にアクセスできる必要があります。 手順については、「 マネージド インジェスト ソースへの接続」を参照してください。
ステージング カタログとスキーマは、移行先のカタログとスキーマと同じにすることができます。 ステージング カタログを外部カタログにすることはできません。 バンドル パイプライン YAML ファイルの gateway_definition セクションでステージングの場所を指定します。
インジェスト ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、Unity カタログステージング ボリュームに格納します。 ゲートウェイを継続的なパイプラインとして実行する必要があります。 これは、ソース データベース上にある変更ログ保持ポリシーに対応するのに役立ちます。
インジェスト パイプラインは、スナップショットを適用し、ステージング ボリュームから宛先ストリーミング テーブルにデータを変更します。
バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 詳細については、「 宣言型オートメーション バンドルとは」を参照してください。
Databricks CLI を使用してバンドルを作成します。
databricks bundle initバンドルに 2 つの新しいリソース ファイルを追加します:
- パイプライン定義ファイル (たとえば、
resources/sqlserver_pipeline.yml)。 「pipeline.ingestion_definitionと例」を参照してください。 - データ インジェストの頻度を制御するジョブ定義ファイル (たとえば、
resources/sqlserver_job.yml)。
- パイプライン定義ファイル (たとえば、
Databricks CLI を使用してパイプラインをデプロイします:
databricks bundle deploy
Databricks ノートブック
ソース接続、ターゲット カタログ、ターゲット スキーマ、およびソースから取り込むテーブルを使用して、次のノートブックの Configuration セルを更新します。
Terraform
Terraform を使用して、SQL Server インジェスト パイプラインをデプロイおよび管理できます。 ゲートウェイとインジェスト パイプラインを作成するための Terraform 構成を含む完全なサンプル フレームワークについては、GitHub の Lakeflow Connect Terraform サンプル リポジトリを参照してください。
データ インジェストが正常に行われたことを確認する
パイプラインの詳細ページのリスト ビューには、データが取り込まれると処理されたレコードの数が表示されます。 これらの数値は自動的に更新されます。
Upserted records 列と Deleted records 列は、既定では表示されません。 有効にするには、[列の構成]
ボタンをクリックして選択します。
例示
これらの例を使用して、パイプラインを構成します。
パイプラインの構成
宣言型オートメーション バンドル
次のパイプライン定義ファイル:
variables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
Databricks ノートブック
パイプライン仕様の Configuration セクションの例を次に示します。
# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"
# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"
# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name
# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"
# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"
# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
# + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])
バンドル ジョブ定義ファイル
宣言型オートメーション バンドルで使用するジョブ定義ファイルの例を次に示します。 ジョブは、最後の実行から正確に 1 日後に毎日実行されます。
resources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
一般的なパターン
高度なパイプライン構成については、「 マネージド インジェスト パイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始し、スケジュールを設定し、アラートを設定します。 一般的なパイプライン メンテナンス タスクを参照してください。