適用対象:
Databricks SQL
Important
この機能は ベータ版です。 Databricks Runtime 17.3 以降が必要です。
ソースからストリーミング テーブルに変更データ キャプチャ (CDC) レコードを処理するには、CREATE STREAMING TABLEと共に FLOW AUTO CDC 句を使用します。
以前は、 MERGE INTO ステートメントは、Azure Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO は、レコードの順序が正しくないために正しくない結果が生成される場合や、レコードの順序を再作成するための複雑なロジックが必要になる場合があります。
AUTO CDC は、順不同のレコードを自動的に処理することで CDC を簡略化します。 レコードを識別するキー、順序付けのシーケンス列、および結果を SCD タイプ 1 (直接更新) または SCD タイプ 2 (履歴追跡) として格納するかどうかを指定します。
構文
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
INSERTイベントとUPDATE イベントの既定の動作は、ソースから CDC イベントをアップサートすることです。指定したキーと一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。
DELETEイベントの処理は、APPLY AS DELETE WHEN条件で指定できます。
パラメーター
sourceデータのソース。 ソースはストリーミング ソースである必要があります。 ストリーミング セマンティクスを使用してソースから読み取る場合は、
STREAMキーワードを使用します。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
KEYSソース データ内の行を一意に識別する列または列の組み合わせ。 これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。
列の組み合わせを定義するには、列のコンマ区切りのリストを使用します。
この句は必須です。
IGNORE NULL UPDATESターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、
IGNORE NULL UPDATESが指定されている場合、null値を持つ列はターゲット内の既存の値を保持します。 これは、null値を持つ入れ子になった列にも適用されます。この句は省略可能です。
既定値は、既存の列を
null値で上書きすることです。APPLY AS DELETE WHENCDC イベントをアップサートではなく
DELETEとして扱うタイミングを指定します。SCD タイプ 2 のソースでは、順序が正しく指定されていないデータを処理するために、削除された行は基になる Delta テーブルの廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、
pipelines.cdc.tombstoneGCThresholdInSecondstable プロパティを使用して構成できます。この句は省略可能です。
APPLY AS TRUNCATE WHENCDC イベントを完全なテーブル
TRUNCATEとして扱うタイミングを指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースにのみ使用する必要があります。APPLY AS TRUNCATE WHEN句は、SCD 型 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。この句は省略可能です。
SEQUENCE BYソース データ内の CDC イベントの論理順序を指定する列名。 パイプライン処理では、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。
シーケンス処理に複数の列が必要な場合は、
STRUCT式を使用します。最初に最初の構造体フィールドで順序付けを行い、同じ値の場合は次に2番目のフィールドで並べ替えを行います。指定された列は、並べ替え可能なデータ型である必要があります。
この句は必須です。
COLUMNSターゲット テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。
- 含める列の完全な一覧を指定します:
COLUMNS (userId, name, city)。 - 除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。
既定では、
COLUMNS句が指定されていない場合は、ターゲット テーブル内のすべての列が含まれます。- 含める列の完全な一覧を指定します:
STORED ASレコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。
この句は省略可能です。
既定値は SCD タイプ 1 です。
TRACK HISTORY ON指定した列に変更がある場合に履歴レコードを生成する出力列のサブセットを指定します。 次のいずれかを実行できます。
- 追跡する列の完全な一覧を指定します:
COLUMNS (userId, name, city)。 - 追跡から除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。 既定では、
TRACK HISTORY ON *と同等の変更がある場合に、すべての出力列の履歴を追跡します。- 追跡する列の完全な一覧を指定します:
例示
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);