Fluxos REPLACE WHERE para tabelas de streaming autônomas

Importante

Os fluxos REPLACE WHERE para tabelas de streaming independentes estão na versão Beta.

Esta página descreve como usar fluxos REPLACE WHERE para recomputar e sobrescrever um subconjunto específico de uma tabela de streaming independente sem reprocessar todo o histórico da sua tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE , você define um predicado na tabela de destino. Todas as linhas correspondentes ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicados. As linhas que não correspondem ao predicado são deixadas intocadas.

Requirements

Os fluxos REPLACE WHERE têm os seguintes requisitos:

  • Sua tabela de streaming deve usar o PREVIEW canal. Consulte as configurações de Pipeline em channel.
  • O Databricks recomenda o Catálogo do Unity e a computação sem servidor. A atualização incremental só tem suporte na computação sem servidor.

Quando usar os fluxos REPLACE WHERE

Use os fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento em lote incremental sem semântica de streaming: processe novas linhas em lotes sem gerenciar conceitos de streaming, como marcas d'água.
  • Reprocessamento seletivo: recompute apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intocadas.
  • Cenários além das capacidades padrão de visão materializada:
    • Tabelas de destino com retenção mais longa do que a da origem
    • Impedindo a recomputação quando uma tabela de dimensão é alterada
    • Evolução do esquema sem recompusar todo o histórico

Criar um fluxo REPLACE WHERE

Use a FLOW REPLACE WHERE cláusula embutida com CREATE OR REFRESH STREAMING TABLE:

CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Durante a atualização, todas as linhas na tabela de destino que correspondem ao predicado são excluídas, a consulta de origem é recomputada para esse mesmo intervalo de predicado e os novos resultados são inseridos. Neste exemplo, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem.

Você não precisa adicionar o predicado à consulta de origem. O mecanismo de pipeline o aplica automaticamente ao fazer a leitura da origem.

Observação

BY NAME é obrigatório. Garante que as colunas sejam associadas pelo nome em vez da posição.

Preencher dados históricos

Para executar backfills, execute instruções DML diretamente na tabela de destino:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Comportamento de atualização completa

Uma atualização completa de um fluxo REPLACE WHERE executa novamente a consulta de origem usando apenas o predicado atual. As linhas inseridas por comandos DML fora do intervalo atual do predicado são excluídas permanentemente.

Aviso

Uma atualização completa limpa todos os dados existentes e executa novamente o fluxo usando apenas seu predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma atualização completa fará com que a tabela contenha somente os dados dos últimos 7 dias. Todas as linhas mais antigas são excluídas permanentemente.

REFRESH STREAMING TABLE orders_enriched FULL;

Para evitar atualizações completas em uma tabela, defina a propriedade pipelines.reset.allowed da tabela como false:

CREATE OR REFRESH STREAMING TABLE orders_enriched
  TBLPROPERTIES (pipelines.reset.allowed = 'false')
  FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
  ...

Atualização incremental

Os fluxos REPLACE WHERE usam a atualização incremental quando possível, reprocessando apenas os dados de origem que foram alterados desde a última atualização em vez de recompusar toda a janela de substituição. A atualização incremental requer computação sem servidor.

Quando a atualização incremental se aplica

Todos os seguintes devem ser verdadeiros:

  • O pipeline é executado na computação sem servidor.
  • Há suporte para a forma de consulta. Consulte Atualização incremental para ver o conjunto de operadores compatíveis.
  • O predicado faz referência a colunas base de uma tabela de origem. Predicados sobre valores derivados, como resultados de funções de agregação ou de janela, não podem ser aplicados a uma fonte de dados, o que desativa a atualização incremental.
  • Nenhum DML externo modificou linhas na janela de substituição atual. O DML que modifica linhas fora da janela atual não é afetado.
  • A janela de substituição atual não inclui linhas excluídas pelo predicado anterior. Se você ampliar o predicado para cobrir um intervalo não processado anteriormente, essa atualização retornará à recomputação completa. As atualizações subsequentes se qualificam para atualização incremental novamente.
  • O predicado é determinístico. Predicados que usam funções não determinísticas, como rand(), desativam a atualização incremental. Funções temporais, como current_date() são permitidas.

A primeira atualização de qualquer fluxo é sempre uma computação completa. Se qualquer condição não for atendida, essa atualização retornará à recomputação completa da janela de substituição atual.

Práticas recomendadas para atualização incremental

Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam aptos à atualização incremental.

Usar um limite inferior móvel

Predicados com um limite inferior móvel permanecem elegíveis para atualização incremental indefinidamente.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas excluídas anteriormente, acionando um retorno pontual à recomputação completa.

Incluir a coluna de predicado em GROUP BY

Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa aplicar o predicado antes da agregação.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna de predicado estiver ausente em GROUP BY, o predicado não poderá ser deslocado para baixo da agregação, e a origem será lida por completo.

Incluir a coluna de predicados nas chaves de junção

Inclua a coluna de predicados na condição de junção para que o mecanismo possa podar todas as fontes associadas na junção.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela associada não expuser a coluna de predicado, essa tabela será verificada por completo em cada atualização.

Diagnosticar o recurso de recomputação completa

Quando uma atualização recorre à recomputação completa, o motivo é informado no evento planning_information do fluxo. Consulte os logs de eventos de pipeline do Monitor. A tabela a seguir lista os motivos relatados no evento:

Motivo Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Um DML externo modificou linhas na janela de substituição atual.
REPLACE_WHERE_NOT_DETERMINISTIC O predicado usa expressões não determinísticas.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC A atualização anterior usou um predicado não determinístico.
UNSUPPORTED_REPLACE_WHERE_PREDICATE O predicado não pode ser enviado por push para qualquer origem, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado.

Exemplos

Os exemplos a seguir mostram padrões comuns de fluxo REPLACE WHERE .

Exemplo 1: Manter agregações históricas de uma fonte de retenção limitada

Este exemplo mantém agregações diárias indefinidamente, mesmo após os dados brutos serem retirados da tabela de origem (retenção de 3 dias):

CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Exemplo 2: impedir a recomputação quando uma tabela de dimensão for alterada

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos de dimensão mudam:

CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Se a região de um usuário for alterada, somente as linhas recentes serão recomputadas. As linhas históricas mantêm o valor da região no momento em que foram gravadas.

Exemplo 3: Adicionar uma nova métrica sem recompusar o histórico completo

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    
  2. Atualize a consulta para adicionar uniq_users:

    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    As linhas anteriores à janela de 7 dias contêm NULL para uniq_users.

Exemplo 4: Iterar em uma janela pequena antes de fazer backup do histórico completo

Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar o intervalo histórico completo.

Comece com uma janela curta para validar as métricas e iterar na lógica de negócios com custos de computação mais baixos:

CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Uma janela curta recalcula apenas os últimos 7 dias a cada atualização, portanto revise a consulta quantas vezes forem necessárias antes de iniciar uma execução histórica completa.

Depois que a consulta for finalizada, use DML para fazer backup do intervalo histórico completo:

INSERT INTO revenue_attribution
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;