スタンドアロンのストリーミング テーブル向けの REPLACE WHERE フロー

Important

スタンドアロン ストリーミング テーブルの REPLACE WHERE フローは ベータ版です

このページでは、REPLACE WHERE フローを使用して、テーブル履歴全体を再処理せずに、スタンドアロン ストリーミング テーブルの対象のサブセットを再計算および上書きする方法について説明します。 REPLACE WHERE フローは、到着が遅れたデータ、アップストリームの再処理、スキーマの進化、バックフィルを処理します。

REPLACE WHERE フローでは、ターゲット テーブルに述語を定義します。 述語に一致するすべての行が削除され、その同じ述語範囲のソース クエリを再評価することによって置き換えられます。 述語と一致しない行はそのまま残ります。

必要条件

REPLACE WHERE フローには、次の要件があります。

  • ストリーミング テーブルでは、 PREVIEW チャネルを使用する必要があります。 channelを参照してください。
  • Databricks では、Unity カタログとサーバーレス コンピューティングが推奨されます。 増分更新 は、サーバーレス コンピューティングでのみサポートされます。

REPLACE WHERE フローを使用する場合

次のシナリオでは、REPLACE WHERE フローを使用します。

  • ストリーミング セマンティクスを使用しない増分バッチ処理: ウォーターマークなどのストリーミングの概念を管理せずに、新しい行をバッチで処理します。
  • 選択的な再処理: 述語に一致する行のみを再計算し、他のすべての行はそのままにします。
  • 標準の具体化されたビュー機能を超えるシナリオ:
    • ソースよりも保持期間が長いターゲット テーブル
    • ディメンション テーブルが変更されたときの再計算の防止
    • 履歴全体を再計算しないスキーマの進化

REPLACE WHERE フローを作成する

CREATE OR REFRESH STREAMING TABLE と同じ行で FLOW REPLACE WHERE 句を使用します。

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

更新中に、述語に一致するターゲット テーブル内のすべての行が削除され、同じ述語範囲に対してソース クエリが再計算され、新しい結果が挿入されます。 この例では、過去 7 日間のすべての行が orders_enriched から削除され、ソース クエリを使用して再計算されます。

ソース クエリに述語を追加する必要はありません。 ソースから読み取るときに、パイプライン エンジンによって自動的に適用されます。

Note

BY NAME は必須です。 これにより、列が位置ではなく名前で一致することが保証されます。

履歴データのバックフィル

バックフィルを実行するには、ターゲット テーブルで DML ステートメントを直接実行します。

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

フル リフレッシュの動作

REPLACE WHERE フローの完全な更新では、現在の述語のみを使用してソース クエリが再実行されます。 現在の述語範囲外の DML ステートメントによって挿入された行は完全に削除されます。

Warning

完全更新では、既存のすべてのデータがクリアされ、定義された述語のみを使用してフローが再実行されます。 パイプラインが 7 日間の条件で 1 年間実行されている場合、フル リフレッシュを行うと、テーブルには直近 7 日分のデータのみが含まれることになります。 古い行はすべて完全に削除されます。

REFRESH STREAMING TABLE orders_enriched FULL;

テーブルの完全な更新を防ぐには、table プロパティの pipelines.reset.allowedfalseに設定します。

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

増分更新

REPLACE WHERE フローでは、可能な場合は増分更新が使用されます。置換ウィンドウ全体を再計算するのではなく、前回の更新以降に変更されたソース データのみを再処理します。 増分更新にはサーバーレス コンピューティングが必要です。

増分更新が適用される場合

次のすべてが当てはまる必要があります。

  • パイプラインはサーバーレス コンピューティングで実行されます。
  • クエリ図形がサポートされています。 サポートされている演算子セットの 増分更新 を参照してください。
  • 述語は、ソース テーブルの基本列を参照します。 集計関数やウィンドウ関数の出力などの派生値に対する述語は、ソースにプッシュダウンできないため、増分更新は無効になります。
  • 現在の置換ウィンドウで外部 DML で行が変更されていない。 現在のウィンドウの外側の行を変更する DML は影響を受けません。
  • 現在の置換ウィンドウには、前の述語で除外された行は含まれません。 以前に処理されていない範囲をカバーするように述語を拡大した場合、その 1 つの更新は完全な再計算にフォールバックします。 後続の更新は、再び増分更新の対象になります。
  • 述語は決定論的です。 rand() などの非決定論的関数を使用する述語は、増分更新を無効にします。 current_date()などのテンポラル関数を使用できます。

すべてのフローの最初の更新は、常に完全な計算です。 条件が満たされていない場合、その更新は現在の置換ウィンドウの完全な再計算にフォールバックします。

増分更新のベスト プラクティス

REPLACE WHERE フローが増分更新の対象のままになるように、次のガイドラインに従ってください。

移動する下限を使用する

下限が変動する述語は、無期限に増分更新の対象であり続けます。

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

date BETWEEN date_add(current_date(), -7) AND current_date()などの移動する上限は、ウィンドウを移動して以前に除外された行を含めることができます。これにより、1 回限りのフォールバックが完全な再計算にトリガーされます。

GROUP BY に述語列を含める

集計する場合は、 GROUP BY に述語列を含めると、エンジンは集計の下に述語をプッシュできます。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

述語列が GROUP BYに存在しない場合、述語を集計の下にプッシュできず、ソースが完全にスキャンされます。

結合キーに述語列を含める

エンジンが結合されたすべてのソースを排除できるように、結合条件に述語列を含めます。

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

結合されたテーブルで述語列が公開されていない場合、そのテーブルは更新のたびに完全にスキャンされます。

完全な再計算へのフォールバックを診断する

更新が完全な再計算にフォールバックすると、その理由はフローの planning_information イベントで報告されます。 パイプライン イベント ログの監視を参照してください。 次の表に、イベントで報告される理由を示します。

理由 Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW 現在の置換ウィンドウ内の行が外部 DML によって変更されました。
REPLACE_WHERE_NOT_DETERMINISTIC 述語では、非決定論的な式が使用されます。
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC 前の更新では、非決定論的述語が使用されています。
UNSUPPORTED_REPLACE_WHERE_PREDICATE 述語はどのソースにもプッシュできません。現在のウィンドウには、前の述語で処理されていない行が含まれているか、実行で述語のオーバーライドが使用されます。

Examples

次の例は、一般的な REPLACE WHERE フロー パターンを示しています。

例 1: 制限付き保持ソースからの履歴集計を保持する

この例では、生データがソース テーブルから経過した後 (3 日間のリテンション期間) であっても、毎日の集計を無期限に保持します。

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

例 2: ディメンション テーブルが変更されたときに再計算を禁止する

この例では、ディメンション属性が変更されたときに履歴ファクト行を変更せずに保持します。

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

ユーザーのリージョンが変更された場合、最新の行のみが再計算されます。 履歴行は、書き込まれた時点でリージョンの値を保持します。

例 3: 完全な履歴を再計算せずに新しいメトリックを追加する

この例では、テーブル定義を進化させ、ターゲット範囲のみをバックフィルする方法を示します。

  1. 初期テーブルを定義します。

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. クエリを更新して、 uniq_usersを追加します。

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    7日間の範囲より古い行には、uniq_usersに対するNULLが含まれています。

例 4: 完全な履歴をバックフィルする前に小さなウィンドウを反復処理する

この例では、履歴範囲全体を処理する前に、小さなデータ ウィンドウでクエリ ロジックを検証する方法を示します。

まず、短いウィンドウからメトリックを検証し、コンピューティング コストを低くしてビジネス ロジックを反復処理します。

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

短いウィンドウでは、更新ごとに過去 7 日間しか再計算されないため、完全な履歴実行にコミットする前に、必要な回数だけクエリを修正します。

クエリが終了したら、DML を使用して履歴範囲全体をバックフィルします。

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;