Fluxo de Alterações de Dados do Lakebase

Note

A funcionalidade Lakebase Change Data Feed está em pré-visualização pública.

O que é o Feed de Dados de Alterações do Lakebase?

A Lakebase introduz um Feed de Dados de Alteração (CDF) nativo, desbloqueando os seus dados operacionais para pipelines, modelos e aplicações a jusante. Cada inserção, atualização e eliminação numa tabela Lakebase Postgres é capturada do registo de escrita antecipada e armazenada como uma nova linha numa tabela Delta gerida pelo Unity Catalog, agrupada e limpa a cada ~15 segundos. O histórico de alterações é armazenado num formato aberto que qualquer motor de computação pode ler.

As tabelas de destino seguem a mesma forma do Delta Change Data Feed: cada linha transporta um _pg_change_type, um LSN, um ID de transação e um carimbo temporal. As alterações operacionais tornam-se uma fonte de referência para ETL, auditoria e consumidores a jusante — sem ter de implementar uma infraestrutura externa de CDC.

Fluxo de dados Lakebase CDF do Postgres através do wal2delta para as tabelas Delta no Unity Catalog.

Casos de uso

O Lakebase CDF traz dados operacionais para o lakehouse para que os pipelines e as aplicações subsequentes possam reagir às alterações à medida que estas ocorrem.

Caso de uso Description
Pipelines ETL Use Lakebase como fonte de bronze para os oleodutos medallion. Criar tarefas incrementais de SDP ou do Spark Structured Streaming com base no feed de alterações e atualizar as tabelas silver e gold a jusante.
Registos de auditoria Mantenha um histórico completo e consultável de cada inserção, atualização e eliminação numa tabela Lakebase para conformidade e forense. O histórico é imutável Delta.
Sistemas externos Armazene os dados de alterações do Lakebase num formato aberto que qualquer motor possa consumir. Como o destino é uma tabela Delta no Unity Catalog, sistemas externos e leitores que não são Databricks podem aceder diretamente ao feed.

Ativar esta pré-visualização

Um administrador do espaço de trabalho tem de ativar a funcionalidade de pré-visualização do feed de dados de alteração do Lakebase na página Pré-visualizações do espaço de trabalho.

Requirements

  • Lakebase Autoscaling: Um projeto Lakebase Autoscaling a executar o Postgres 17.
  • Base de dados fonte: As tabelas devem residir na databricks_postgres base de dados no Lakebase. Cada projeto é criado com esta base de dados predefinida. Esta é uma limitação conhecida.
  • Catálogo Unity: A identidade que configura o CDF necessita de USE CATALOG, USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Ver Conceder permissões sobre um objeto.
  • Armazenamento predefinido: Os catálogos de destino configurados com armazenamento predefinido não são suportados.
  • Projeto Lakebase: O seu papel de Postgres requer permissões CAN MANAGE no projeto Lakebase. Os proprietários do projeto têm PODEM GERIR por defeito. Veja Gerir permissões de projeto.
  • Tipos de dados: Veja Mapeamento de tipos de dados. Tipos sem equivalente direto em Delta são armazenados como STRING.

Criação da Lakebase CDF

Para começar, define a identidade da réplica completa nas tabelas que queres no feed (Passo 1), depois inicia o CDF na aplicação Lakebase (Passo 2). Os seus dados aparecem como tabelas Delta lb_<table_name>_history no catálogo e esquema do Unity Catalog que escolher.

Passo 1: Definir a identidade da réplica completa

Para que uma tabela do Lakebase participe no CDF, deve ter REPLICA IDENTITY FULL definido. Por defeito, o Postgres regista apenas a chave primária quando uma linha é atualizada ou eliminada. Configurar a identidade completa instrui o Postgres a registar tanto o estado da linha antes da alteração como depois da alteração no registo write-ahead, de que o CDF necessita para construir um histórico completo de alterações.

Pode executar estes comandos no Lakebase SQL Editor ou em qualquer cliente Postgres.

Tabela única

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Todas as tabelas existentes num esquema

Para definir a identidade de réplica em cada tabela existente num esquema (public neste exemplo), execute:

DO $$
DECLARE r record;
BEGIN
  FOR r IN
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
      AND table_type = 'BASE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
      r.table_schema, r.table_name
    );
  END LOOP;
END $$;

Aplicação automática a tabelas futuras

Para que todas as tabelas recém-criadas recebam REPLICA IDENTITY FULLautomaticamente , instale um gatilho de evento Postgres. Executa-se após cada CREATE TABLE e define a identidade na nova tabela:

CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
  obj record;
BEGIN
  FOR obj IN
    SELECT * FROM pg_event_trigger_ddl_commands()
    WHERE command_tag = 'CREATE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %s REPLICA IDENTITY FULL;',
      obj.object_identity
    );
  END LOOP;
END $$;

CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();

Combine o acionador de evento com o ciclo no separador anterior para abranger tanto as tabelas existentes como as tabelas futuras numa única configuração.

Verifique quais as tabelas com identidade de réplica definida

Para ver que tabelas num esquema têm identidade réplica configurada, execute:

SELECT n.nspname AS table_schema,
       c.relname AS table_name,
       CASE c.relreplident
         WHEN 'd' THEN 'default'
         WHEN 'n' THEN 'nothing'
         WHEN 'f' THEN 'full'
         WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
  AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Só as linhas com replica_identity = 'full' estão prontas para CDF.

Passo 2: Iniciar o feed de alterações de dados

O Lakebase CDF é configurado ao nível do esquema. Uma vez iniciado, todas as tabelas atuais e futuras do esquema de origem são incluídas no feed.

  1. No teu espaço de trabalho Azure Databricks, abre Lakebase Postgres no comutador de aplicações (canto superior direito).
  2. Seleciona o teu projeto Lakebase e o ramo que queres usar (por exemplo, produção ou principal).
  3. Abra a visão geral do ramo e clique no separador Alterar Feed de Dados .
  4. Clique em Iniciar.
  5. No diálogo de configuração:
    • Base de dados: É predefinida como databricks_postgres.
    • Esquema: Selecione o esquema Postgres de origem.
    • Para o catálogo: Selecione o catálogo de destino do Unity Catalog.
    • Esquema: Selecione o esquema do Catálogo Unity de destino.
  6. Clica em Iniciar para iniciar a transmissão.

Visão geral do ramo com separador Change Data Feed que mostra Start e configuração do esquema.

As tabelas aparecem no destino como lb_<table_name>_history. Para os encontrar, abra Catálogo na barra lateral, navegue até ao catálogo e esquema de destino e abra o separador Tabelas.

O separador Alterar Feed de Dados no Lakebase tem dois subseparadores:

Os separadores secundários apresentam o mapeamento e o progresso de cada tabela.

  • Esquemas: Lista cada esquema de origem, o seu catálogo de destino e esquema no Unity Catalog, e um estado.
  • Tabelas: Lista cada tabela de origem, a tabela lb_<table_name>_history de destino, o estado (Streaming ou Snapshotting), o LSN consolidado (até que ponto o feed escreveu no Delta, apresentado como - enquanto ainda está no snapshot inicial) e a Última atualização (a última vez que a tabela recebeu alterações).

Também pode inspecionar o estado do feed do Postgres executando isto no Lakebase SQL Editor:

SELECT * FROM wal2delta.tables;

O resultado inclui table_oid, status (STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time em cada tabela.

Importante

O que é o wal2delta? O Lakebase CDF é alimentado pela extensão wal2delta Postgres, que corre dentro do cálculo Lakebase. Utiliza descodificação lógica para capturar alterações do write-ahead log (WAL) e grava-as em tabelas Delta no Unity Catalog.

Esquema da tabela de destino

O CDF escreve uma tabela Delta por cada tabela de origem, com o nome lb_<table_name>_history no catálogo e esquema de destino. Além das colunas de origem, cada linha contém estas colunas do sistema:

Coluna Tipo Description
_pg_change_type TEXTO Tipo de operação: insert, delete, update_preimage, ou update_postimage.
_pg_lsn BIGINT Número de sequência do registo do Postgres.
_pg_xid INTEGER ID de Transação Postgres.
_timestamp TIMESTAMP Carimbo temporal em que a alteração foi processada (sem fuso horário).
_sort_by BIGINT Chave de ordenação monótona usada para ordenar todas as alterações.

Padrões comuns de alteração

  • Captura instantânea inicial: Da primeira vez que o CDF é executado numa tabela Lakebase existente, cada uma das linhas existentes é registada com _pg_change_type = 'insert'.
  • Atualizações: Uma atualização produz duas linhas: uma com _pg_change_type = 'update_preimage' (linha antiga) e outra com _pg_change_type = 'update_postimage' (linha nova).
  • Apaga: Uma eliminação produz uma linha com _pg_change_type = 'delete'.

Estes são os mesmos eventos de alteração que o Delta Change Data Feed, pelo que aplicam-se os mesmos padrões a jusante.

Comportamento operacional

  • Nomeação das colisões: Se duas tabelas de origem mapearem para o mesmo nome de destino (por exemplo, sales.users e marketing.users ambas mapearem para lb_users_history), CDF escreve a primeira para lb_users_history e auto-sufixa a segunda para lb_users_history_1. Podes renomear qualquer uma das tabelas de destino no Unity Catalog e o feed continua a funcionar.
  • Âmbito ao nível do esquema: Quando inicias o CDF num esquema Lakebase, todas as tabelas atuais e futuras desse esquema estão incluídas. As tabelas vazias são ignoradas — uma tabela deve ter pelo menos uma linha para aparecer no destino.
  • Tabelas de origem eliminadas: Se deixares uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog é preservada.

Construir pipelines a jusante

O Lakebase CDF foi concebido para oleodutos a jusante que reagem a alterações operacionais. Os padrões abaixo mostram três formas de consumir a ração, ordenadas do mais simples ao mais flexível.

Exemplo de cenário. Uma aplicação de comércio eletrónico regista encomendas numa tabela Postgres orders , cada linha contendo um item_id e quantity. A equipa de logística precisa de níveis de inventário em tempo real. Com o CDF, todas as alterações feitas a orders são armazenadas na tabela Delta lb_orders_history no Unity Catalog. Os pipelines a jusante leem esse feed de alterações e atualizam uma inventory_levels tabela sempre que uma encomenda é feita, editada ou cancelada.

Calcular o inventário atual com uma vista materializada

O padrão mais simples é uma vista SQL materializada sobre a tabela de histórico. O MV atualiza-se incrementalmente à medida que surgem novos eventos de mudança, e os consumidores a jusante interrogam-no como qualquer outra tabela.

CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
  item_id,
  SUM(
    CASE
      -- New orders (and the "new half" of updates) decrement inventory
      WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
      -- Cancellations (and the "old half" of updates) restore inventory
      WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
      ELSE 0
    END
  ) AS current_inventory,
  MAX(_timestamp) AS last_transaction_ts,
  MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;

As duas linhas geradas para cada atualização anulam-se mutuamente, exceto pela variação líquida, pelo que a soma acumulada se mantém correta à medida que as encomendas vão sendo editadas.

Alterações de fluxo com Spark Declarative Pipelines

Para uma arquitetura medallion estruturada, utilize Spark Declarative Pipelines (SDP) para declarar tabelas de bronze, prata e ouro. O SDP executa-os como um pipeline interligado, com os pontos de controlo e a gestão de dependências tratados automaticamente.

import dlt
from pyspark.sql import functions as F

@dlt.table
def inventory_adjustments():
    return (
        spark.readStream.table("<catalog>.<schema>.lb_orders_history")
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .select("item_id", "delta", "_timestamp")
    )

@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
    return (
        spark.read.table("LIVE.inventory_adjustments")
        .groupBy("item_id")
        .agg(F.sum("delta").alias("on_hand"))
    )

inventory_adjustmentslb_orders_history de forma incremental com readStream e gera um delta por evento. inventory_levels agrega por item_id para calcular o stock atual. A expectativa elimina linhas que poderiam levar a ações para negativo, sinalizando um bug a montante.

Para um guia completo de ponta a ponta, veja Tutorial: Construir um pipeline ETL usando captura de dados de alteração.

Processamento personalizado com Spark Structured Streaming

Quando precisar de controlo total — por exemplo, fusões personalizadas, efeitos secundários ou vários destinos — leia diretamente a tabela de histórico com o Spark Structured Streaming e use foreachBatch para escrever no destino pretendido.

from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_inventory(batch_df, batch_id):
    deltas = (
        batch_df
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .groupBy("item_id")
        .agg(F.sum("delta").alias("delta"))
    )

    target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
    (target.alias("t")
        .merge(deltas.alias("s"), "t.item_id = s.item_id")
        .whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
        .whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
        .execute())

(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
    .writeStream
    .foreachBatch(update_inventory)
    .option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
    .start())

Cada microlote agrupa os eventos de alteração por item_id e combina os deltas finais em inventory_levels.

Incremental por conceção. Cada lb_<table_name>_history tabela é uma tabela Delta apenas para acrescentar. Cada alteração na origem é registada como uma nova linha, com _pg_change_type a assinalar a operação. Databricks SQL vistas materializadas, fluxos do Lakeflow Spark Declarative Pipelines e jobs do Spark Structured Streaming processam incrementalmente novas linhas a partir do registo de transações do Delta, pelo que os pipelines subsequentes só executam trabalho proporcional ao que foi alterado. Não precisa de ativar o Delta Change Data Feed na tabela de histórico porque a semântica de mudança já está codificada nos dados da linha.

Mapeamento de tipos de dados

O CDF suporta a maioria dos tipos padrão de primitivas PostgreSQL. Tipos sem equivalente direto em Delta são armazenados como STRING.

Tipo PostgreSQL tipo Azure Databricks Delta Notes
BOOLEAN BOOLEAN
INT, SMALLINT, BIGINT INT, SMALLINT, BIGINT
TEXTO, VARCHAR, CHAR STRING
JSONB STRING Armazenado como uma string JSON.
ENUM STRING Guardado como a etiqueta de enum.
NUMÉRICO / DECIMAL DECIMAL ou STRING Utiliza a precisão/escala da fonte sempre que possível. Realiza reescalonamento sem perdas para valores de precisão/escala incompatíveis. Volta para STRING quando a precisão excede 38 ou quando a precisão/escala não está definida (NUMERIC ilimitado). Todas as colunas NUMERIC/DECIMAL são anuláveis porque os valores NaN são mapeados para NULL. Ver tipos numéricos PostgreSQL.
DATE DATE
TIMESTAMP TIMESTAMP_NTZ
CARIMBO DE DATA/HORA COM FUSO HORÁRIO TIMESTAMP
FLUTUAR, DOBRAR FLUTUAR, DOBRAR

Tipos armazenados como STRING:

  • Geografia/Geometria (Pós-SIG): Tipos da extensão PostGIS (por exemplo, geometry, geography).
  • Vetor (pgvector): O tipo vector da extensão pgvector.
  • Tipos compostos/estruturas: Tipos personalizados definidos com CREATE TYPE ... AS (field_name type, ...). Estes são tipos em forma de linha com campos nomeados.
  • Mapa: Tipos de chave-valor semelhantes a mapas, como hstore (da hstore extensão). O Postgres não tem um tipo de mapa incorporado. hstore é a forma comum de armazenar pares-chave-valor numa coluna.

Gerir alterações de esquemas

  • Renomear uma tabela no Postgres (por exemplo, ALTER TABLE users RENAME TO customers) permite que o feed continue. O nome da tabela Delta de destino não muda — mantém-se lb_users_history.
  • Alterações no esquema (adicionar uma coluna, eliminar uma coluna ou alterar o tipo de dado de uma coluna) desencadeiam uma nova captura da tabela afetada. O CDF relê toda a tabela do Postgres e reescreve-a para a tabela Delta de destino.

Desativar o Lakebase CDF

Desativar o CDF interrompe o feed de todos os esquemas Lakebase do projeto.

  1. No teu espaço de trabalho Azure Databricks, abre Lakebase Postgres no comutador de aplicações (canto superior direito).
  2. Selecione o seu projeto Lakebase e a ramificação onde configurou o CDF.
  3. Abra Descrição geral da ramificação e clique no separador Change Data Feed.
  4. Clique em Desabilitar. No diálogo de confirmação, reveja o aviso de que as alterações deixarão de fluir para as tabelas Delta, depois clique novamente em Desativar para confirmar.

Desativar o CDF não reinicia o seu cálculo.

Advertência

Se voltar a ativar o CDF mais tarde, o sistema não efetua uma nova captura completa. Quaisquer alterações que tenham ocorrido enquanto o CDF estava desativado estão permanentemente ausentes das tabelas Delta de destino.

Limitações e resolução de problemas

Pode ver o estado por tabela (snapshoting, skip ou streaming) no separador Change Data Feed , ou executando isto no Lakebase:

SELECT * FROM wal2delta.tables;

Razões comuns pelas quais uma tabela não aparece no feed:

  • REPLICA IDENTITY FULL não definida: Execute ALTER TABLE <table_name> REPLICA IDENTITY FULL; para a tabela. Ver Passo 1: Defina a identidade da réplica completa.
  • Tabelas particionadas: Tabelas particionadas Lakebase não são suportadas. Um esquema que contém tabelas particionadas faz com que essas tabelas falhem.
  • Mesas vazias: Uma tabela com zero linhas é ignorada até que exista pelo menos uma linha.

Passos seguintes