Lakeflow Spark 宣言パイプラインは、 AUTO CDC API と AUTO CDC FROM SNAPSHOT API を使用して変更データ キャプチャ (CDC) を簡略化します。 これらの API は、CDC フィードまたはデータベース スナップショットから、緩やかに変化するディメンション (SCD) タイプ 1 とタイプ 2 の計算の複雑さを自動化します。 これらの概念の詳細については、「 データ キャプチャとスナップショットの変更」を参照してください。
注
AUTO CDC API は、APPLY CHANGES API を置き換え、同じ構文を持ちます。
APPLY CHANGES API は引き続き使用できますが、Databricks では、AUTO CDC API を代わりに使用することをお勧めします。
使用する API は、変更データのソースによって異なります。
-
AUTO CDC: ソース データベースで CDC フィードが有効になっている場合に使用します。AUTO CDCは、変更データ フィード (CDF) からの変更を処理します。 これは、パイプライン SQL インターフェイスと Python インターフェイスの両方でサポートされています。 -
AUTO CDC FROM SNAPSHOT: ソース データベースで CDC が有効ではなく、スナップショットのみを使用できる場合に使用します。 この API は、スナップショットを比較して変更を決定し、それらを処理します。 Python インターフェイスでのみサポートされます。
どちらの API でも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされています。
- SCD タイプ 1 を使用して、レコードを直接更新します。 更新されたレコードの履歴は保持されません。
- SCD タイプ 2 を使用して、すべての更新または指定された列セットの更新時にレコードの履歴を保持します。
AUTO CDC API は、Apache Spark 宣言パイプラインではサポートされていません。
構文とその他の参照については、 AUTO CDC INTO (パイプライン)、 create_auto_cdc_flow、 およびcreate_auto_cdc_from_snapshot_flowに関するページを参照してください。
注
このページでは、ソース データの変更に基づいてパイプライン内のテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「 Azure Databricks での Delta Lake 変更データ フィードの使用」を参照してください。
Requirements
CDC API を使用するには、 サーバーレス SDP または SDPPro または Advanceditions を使用するようにパイプラインを構成する必要があります。
AUTO CDC のしくみ
AUTO CDCを使用して CDC 処理を実行するには、ストリーミング テーブルを作成し、SQL の AUTO CDC ... INTO ステートメントまたは Python の create_auto_cdc_flow() 関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 シーケンス処理と SCD ロジックのしくみの詳細については、 データ キャプチャとスナップショットの変更に関する記事を参照してください。
AUTO CDC の例を参照してください。
変更フィードを含むソースからの初期ハイドレートの場合は、AUTO CDC フローでonceを使用し、変更フィードの処理を続行します。
AUTO CDC を使用した外部 RDBMS テーブルのレプリケートに関するページを参照してください。
構文の詳細については、 AUTO CDC INTO (パイプライン) またはcreate_auto_cdc_flowに関するページを参照 してください。
スナップショットからの自動 CDC のしくみ
AUTO CDC FROM SNAPSHOT は、順番にスナップショットを比較することで、ソース データの変更を決定します。 Python パイプライン インターフェイスでのみサポートされます。 差分テーブル、クラウド ストレージ ファイル、または JDBC からスナップショットを直接読み取ることができます。
AUTO CDC FROM SNAPSHOTを使用して CDC 処理を実行するには、ストリーミング テーブルを作成し、create_auto_cdc_from_snapshot_flow()関数を使用してスナップショット、キー、およびその他の引数を指定します。 2 つのインジェスト パターンの詳細と、それぞれを使用するタイミングについては、「 スナップショット処理パターン」を参照してください。
AUTO CDC FROM SNAPSHOT の例を参照してください。
構文の詳細については、 create_auto_cdc_from_snapshot_flowを参照してください。
シーケンス処理に複数の列を使用する
複数の列 (タイムスタンプと ID を使用してタイを解除する場合など) を順番に並べるには、 STRUCT を使用してそれらを組み合わせます。 API は最初のフィールドで並べ替え、同点が発生した場合は二番目のフィールドを基準とします。
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
AUTO CDC の例
次の例は、変更データ フィード ソースを使用した SCD タイプ 1 およびタイプ 2 の処理を示しています。 サンプル データは、新しいユーザー レコードを作成し、ユーザー レコードを削除して、ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作は到着が遅れ、ターゲット テーブルから削除され、順不同のイベント処理が示されています。
これらの例で使用される入力レコードを次に示します。 このデータは、「サンプル データの作成」セクションでクエリを実行することによって 作成 されます。
| userId | 名前 | city | 操作 | シーケンス番号 |
|---|---|---|---|---|
| 124 | ラウル | Oaxaca | INSERT | 1 |
| 123 | Isabel | モンテレー | INSERT | 1 |
| 125 | メルセデス | ティファナ | INSERT | 2 |
| 126 | リリー | カンクン | INSERT | 2 |
| 123 | null 値 | null 値 | DELETE | 6 |
| 125 | メルセデス | Guadalajara | UPDATE | 6 |
| 125 | メルセデス | メヒカリ | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
サンプル データ生成クエリの最後の行のコメントを解除すると、テーブルを切り捨てる (テーブルをクリアする) ことを指定する次のレコードが sequenceNum=3に挿入されます。
| userId | 名前 | city | 操作 | シーケンス番号 |
|---|---|---|---|---|
| null 値 | null 値 | null 値 | 切り捨てる | 3 |
注
次の例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれは省略可能です。
サンプル データを作成する
次のステートメントを実行して、サンプル データセットを作成します。 このコードは、パイプライン定義の一部として実行されるものではありません。 変換フォルダーではなく、パイプラインの探索フォルダーから実行します。
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
SCD タイプ 1 の更新を処理する
SCD タイプ 1 では、各レコードの最新バージョンのみが保持されます。 次の例では、上記で作成した変更データ フィードから読み取り、変更をストリーミング テーブル ターゲットに適用します。 このコードを実行する Lakeflow Spark 宣言パイプラインを開発します。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD タイプ 1 の例を実行すると、ターゲット テーブルには次のレコードが含まれます。
| userId | 名前 | city |
|---|---|---|
| 124 | ラウル | Oaxaca |
| 125 | メルセデス | Guadalajara |
| 126 | リリー | カンクン |
ユーザー 123 (Isabel) が削除され、表示されません。 SCD Type 1 は以前の値を上書きするため、ユーザー 125 (メルセデス) には最新の市区町村 (グアダラハラ) のみが表示されます。
UPDATEでの以前のsequenceNum=5は、sequenceNum=6の後の更新プログラムが到着したため、削除されました。
TRUNCATE レコードをコメント解除してこの例を実行すると、テーブルは sequenceNum=3 でクリアされます。 つまり、レコードの 124 と 126 はテーブルに含まれていないので、最終的なターゲット テーブルには次のレコードのみが含まれます。
| userId | 名前 | city |
|---|---|---|
| 125 | メルセデス | Guadalajara |
SCD タイプ 2 の更新を処理する
SCD Type 2 では、レコードの各バージョンに対して新しい行を作成し、各バージョンがアクティブだったタイミングを示す __START_AT 列と __END_AT 列を使用して、変更の完全な履歴を保持します。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD Type 2 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。
| userId | 名前 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | モンテレー | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | ラウル | Oaxaca | 1 | null 値 |
| 125 | メルセデス | ティファナ | 2 | 5 |
| 125 | メルセデス | メヒカリ | 5 | 6 |
| 125 | メルセデス | Guadalajara | 6 | null 値 |
| 126 | リリー | カンクン | 2 | null 値 |
テーブルは完全な履歴を保持します。 ユーザー 123 には 2 つのバージョンがあります (削除されるとシーケンス 6 で終了)。 ユーザー 125 には、市区町村の変更を示す 3 つのバージョンがあります。
__END_AT = nullを含むレコードは現在アクティブです。
SCD タイプ 2 で列のサブセットを追跡する
既定では、SCD Type 2 では、列の値が変更されるたびに新しいバージョンが作成されます。 追跡する列のサブセットを指定すると、新しい履歴レコードを生成するのではなく、他の列に対する変更によって現在のバージョンが更新されます。
次の例では、履歴追跡から city 列を除外します。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
cityの変更は追跡されないため、新しいバージョンを作成する代わりに、市区町村の更新によって現在の行が上書きされます。 ターゲット テーブルには、次のレコードが含まれています。
| userId | 名前 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | ラウル | Oaxaca | 1 | null 値 |
| 125 | メルセデス | Guadalajara | 2 | null 値 |
| 126 | リリー | カンクン | 2 | null 値 |
スナップショットによるAUTO CDCの例
次のセクションでは、 AUTO CDC FROM SNAPSHOT を使用してスナップショットを SCD Type 1 または Type 2 ターゲット テーブルに処理する例を示します。 この API を使用する場合の背景については、「 データ キャプチャとスナップショットの変更」を参照してください。
例: パイプライン インジェスト時間を使用してスナップショットを処理する
この方法は、スナップショットが定期的かつ順番に到着し、バージョン管理のためにパイプライン実行タイムスタンプに依存できる場合に使用します。 パイプラインの更新ごとに新しいスナップショットが取り込まれます。
差分テーブル、クラウド ストレージ ファイル、JDBC 接続など、複数のソースの種類からスナップショットを読み取ることができます。
手順 1: サンプル データを作成する
スナップショット データを含むテーブルを作成します。 パイプラインの explorations フォルダーにあるノートブックまたは Databricks SQL から次のコードを実行します。
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
手順 2: スナップショットから AUTO CDC を実行する
この手順でコードを実行する Lakeflow Spark 宣言パイプラインを開発します。
スナップショット ビューのソースの種類を選択します (サンプル作成コードでは Delta テーブルが生成されます)。
オプション A: Delta テーブルから読み取る
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
オプション B: クラウド ストレージから読み取る
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
オプション C: JDBC から読み取る (クラシック コンピューティングのみ)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
すべてのオプション、ターゲットに書き込む
次に、ターゲット テーブルとフローを追加します。
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
最初のパイプラインの実行後、すべてのレコードがアクティブな行として挿入されます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | null 値 |
| 2 | モンテレー | 0 | null 値 |
| 3 | ティファナ | 0 | null 値 |
注
代わりに SCD Type 1 を使用し、現在の状態のみを保持するには、 stored_as_scd_type=1設定します。 この場合、ターゲット テーブルには __START_AT 列と __END_AT 列は含まれません。
手順 3: 新しいスナップショットをシミュレートして再実行する
ソース テーブルを更新して、到着する新しいスナップショットをシミュレートします (pipline の explorations フォルダーにあるノートブックまたは SQL ファイルからこのコードを実行します)。
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
パイプラインを再実行します。
AUTO CDC FROM SNAPSHOT は、新しいスナップショットを前のスナップショットと比較し、ユーザー 1 が削除され、ユーザー 2 と 3 が更新され、ユーザー 4 と 6 が挿入されたことを検出します。 これにより変更フィードが生成され、 AUTO CDC を使用して出力テーブルが作成されます。
SCD タイプ 2 を使用した 2 回目の実行後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | モンテレー | 0 | 1 |
| 2 | カーメル | 1 | null 値 |
| 3 | ティファナ | 0 | 1 |
| 3 | ロサンゼルス | 1 | null 値 |
| 4 | デスバレー | 1 | null 値 |
| 6 | キングスキャニオン | 1 | null 値 |
ユーザー 1 は終了 (削除) されました。 ユーザー 2 と 3 にはそれぞれ、都市の変更を示す 2 つのバージョンがあります。 ユーザー 4 と 6 が新しく挿入されました。
SCD タイプ 1 で 2 回目の実行が行われた後、ターゲット テーブルには現在の状態のみが表示されます。
| userId | city |
|---|---|
| 2 | カーメル |
| 3 | ロサンゼルス |
| 4 | デスバレー |
| 6 | キングスキャニオン |
例: バージョン関数を使用してスナップショットを処理する
スナップショットの順序を明示的に制御する必要がある場合は、この方法を使用します。 たとえば、複数のスナップショットが同時に到着する場合や、スナップショットが順不同で到着する場合に、この方法を使用します。 次に処理するスナップショットとそのバージョン番号を指定する関数を記述します。 API は、バージョンの昇順でスナップショットを処理します。
- 複数のスナップショットがストレージ内にある場合、それらはすべて順番に処理されます。
- スナップショットが順序に誤って到着した場合 (たとえば、
snapshot_3後に到着snapshot_4)、スナップショットはスキップされます。 - 新しいスナップショットがない場合、関数は
Noneを返し、処理は行われません。
手順 1: スナップショット ファイルを準備する
スナップショット データを含む CSV ファイルを作成し、ボリュームまたはクラウドストレージの場所に追加します。 ファイルに時系列で名前を付けます (たとえば、 snapshot_1.csv、 snapshot_2.csv)。
各ファイルには、 userId と cityの列が含まれている必要があります。 例えば次が挙げられます。
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | モンテレー |
| 3 | ティファナ |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | カーメル |
| 3 | ロサンゼルス |
| 4 | デスバレー |
手順 2: バージョン関数を使用して AUTO CDC FROM SNAPSHOT を実行する
新しいノートブックを作成し、次のパイプライン コードを貼り付けます。 次に 、Lakeflow Spark 宣言パイプラインを開発します。
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
注
代わりに SCD Type 1 を使用するには、 stored_as_scd_type=1設定します。
snapshot_1.csv処理後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | null 値 |
| 2 | モンテレー | 1 | null 値 |
| 3 | ティファナ | 1 | null 値 |
snapshot_2.csv処理後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | モンテレー | 1 | 2 |
| 2 | カーメル | 2 | null 値 |
| 3 | ティファナ | 1 | 2 |
| 3 | ロサンゼルス | 2 | null 値 |
| 4 | デスバレー | 2 | null 値 |
注
SCD タイプ 1 の場合、テーブルは最新のスナップショットとまったく同じになります。 違いは、ダウンストリーム クエリで変更フィードを使用して、変更されたレコードのみを処理できることです。
手順 3: 新しいスナップショットを追加する
変更されたデータ (変更された市区町村の値、新しい行、削除された行など) を含む新しい CSV ファイルを保存場所に追加します。 次に、パイプラインをもう一度実行して、新しいスナップショットを処理します。
制限事項
- シーケンス列は、並べ替え可能なデータ型である必要があります。
NULLシーケンス値はサポートされていません。 -
AUTO CDC FROM SNAPSHOTは Python パイプライン インターフェイスでのみサポートされます。SQL インターフェイスはサポートされていません。
その他のリソース
- 変更データキャプチャとスナップショット: CDC の概念、スナップショット、および SCD タイプについて学びます。
-
AUTO CDCを使用して外部 RDBMS テーブルをレプリケートする:onceフローで初期ハイドレートを実行し、変更の処理を続行する方法について説明します。 - AUTO CDC の高度なトピック: AUTO CDC ターゲットに対する変更操作、変更データ フィードの読み取り、メトリックの処理について説明します。
- チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する