次の方法で共有


CREATE STREAMING TABLE ...FLOW AUTO CDC

適用対象: チェック マークあり Databricks SQL

Important

この機能は ベータ版です。 Databricks Runtime 17.3 以降が必要です。

ソースからストリーミング テーブルに変更データ キャプチャ (CDC) レコードを処理するには、CREATE STREAMING TABLEと共に FLOW AUTO CDC 句を使用します。

以前は、 MERGE INTO ステートメントは、Azure Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO は、レコードの順序が正しくないために正しくない結果が生成される場合や、レコードの順序を再作成するための複雑なロジックが必要になる場合があります。

AUTO CDC は、順不同のレコードを自動的に処理することで CDC を簡略化します。 レコードを識別するキー、順序付けのシーケンス列、および結果を SCD タイプ 1 (直接更新) または SCD タイプ 2 (履歴追跡) として格納するかどうかを指定します。

構文

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

INSERTイベントとUPDATE イベントの既定の動作は、ソースから CDC イベントをアップサートすることです。指定したキーと一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETEイベントの処理は、APPLY AS DELETE WHEN条件で指定できます。

パラメーター

  • source

    データのソース。 ソースはストリーミング ソースである必要があります。 ストリーミング セマンティクスを使用してソースから読み取る場合は、 STREAM キーワードを使用します。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。

    ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。

  • KEYS

    ソース データ内の行を一意に識別する列または列の組み合わせ。 これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。

    列の組み合わせを定義するには、列のコンマ区切りのリストを使用します。

    この句は必須です。

  • IGNORE NULL UPDATES

    ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、 IGNORE NULL UPDATES が指定されている場合、 null 値を持つ列はターゲット内の既存の値を保持します。 これは、 null 値を持つ入れ子になった列にも適用されます。

    この句は省略可能です。

    既定値は、既存の列を null 値で上書きすることです。

  • APPLY AS DELETE WHEN

    CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。

    SCD タイプ 2 のソースでは、順序が正しく指定されていないデータを処理するために、削除された行は基になる Delta テーブルの廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSecondstable プロパティを使用して構成できます。

    この句は省略可能です。

  • APPLY AS TRUNCATE WHEN

    CDC イベントを完全なテーブル TRUNCATEとして扱うタイミングを指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースにのみ使用する必要があります。

    APPLY AS TRUNCATE WHEN句は、SCD 型 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。

    この句は省略可能です。

  • SEQUENCE BY

    ソース データ内の CDC イベントの論理順序を指定する列名。 パイプライン処理では、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。

    シーケンス処理に複数の列が必要な場合は、STRUCT 式を使用します。最初に最初の構造体フィールドで順序付けを行い、同じ値の場合は次に2番目のフィールドで並べ替えを行います。

    指定された列は、並べ替え可能なデータ型である必要があります。

    この句は必須です。

  • COLUMNS

    ターゲット テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。

    • 含める列の完全な一覧を指定します: COLUMNS (userId, name, city)
    • 除外する列の一覧を指定します。 COLUMNS * EXCEPT (operation, sequenceNum)

    この句は省略可能です。

    既定では、 COLUMNS 句が指定されていない場合は、ターゲット テーブル内のすべての列が含まれます。

  • STORED AS

    レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。

    この句は省略可能です。

    既定値は SCD タイプ 1 です。

  • TRACK HISTORY ON

    指定した列に変更がある場合に履歴レコードを生成する出力列のサブセットを指定します。 次のいずれかを実行できます。

    • 追跡する列の完全な一覧を指定します: COLUMNS (userId, name, city)
    • 追跡から除外する列の一覧を指定します。 COLUMNS * EXCEPT (operation, sequenceNum)

    この句は省略可能です。 既定では、 TRACK HISTORY ON *と同等の変更がある場合に、すべての出力列の履歴を追跡します。

例示

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);