Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Note
A funcionalidade Lakebase Change Data Feed está em pré-visualização pública.
O que é o Feed de Dados de Alterações do Lakebase?
A Lakebase introduz um Feed de Dados de Alteração (CDF) nativo, desbloqueando os seus dados operacionais para pipelines, modelos e aplicações a jusante. Cada inserção, atualização e eliminação numa tabela Lakebase Postgres é capturada do registo de escrita antecipada e armazenada como uma nova linha numa tabela Delta gerida pelo Unity Catalog, agrupada e limpa a cada ~15 segundos. O histórico de alterações é armazenado num formato aberto que qualquer motor de computação pode ler.
As tabelas de destino seguem a mesma forma do Delta Change Data Feed: cada linha transporta um _pg_change_type, um LSN, um ID de transação e um carimbo temporal. As alterações operacionais tornam-se uma fonte de referência para ETL, auditoria e consumidores a jusante — sem ter de implementar uma infraestrutura externa de CDC.
Casos de uso
O Lakebase CDF traz dados operacionais para o lakehouse para que os pipelines e as aplicações subsequentes possam reagir às alterações à medida que estas ocorrem.
| Caso de uso | Description |
|---|---|
| Pipelines ETL | Use Lakebase como fonte de bronze para os oleodutos medallion. Criar tarefas incrementais de SDP ou do Spark Structured Streaming com base no feed de alterações e atualizar as tabelas silver e gold a jusante. |
| Registos de auditoria | Mantenha um histórico completo e consultável de cada inserção, atualização e eliminação numa tabela Lakebase para conformidade e forense. O histórico é imutável Delta. |
| Sistemas externos | Armazene os dados de alterações do Lakebase num formato aberto que qualquer motor possa consumir. Como o destino é uma tabela Delta no Unity Catalog, sistemas externos e leitores que não são Databricks podem aceder diretamente ao feed. |
Ativar esta pré-visualização
Um administrador do espaço de trabalho tem de ativar a funcionalidade de pré-visualização do feed de dados de alteração do Lakebase na página Pré-visualizações do espaço de trabalho.
Requirements
- Lakebase Autoscaling: Um projeto Lakebase Autoscaling a executar o Postgres 17.
-
Base de dados fonte: As tabelas devem residir na
databricks_postgresbase de dados no Lakebase. Cada projeto é criado com esta base de dados predefinida. Esta é uma limitação conhecida. - Catálogo Unity: A identidade que configura o CDF necessita de USE CATALOG, USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Ver Conceder permissões sobre um objeto.
- Armazenamento predefinido: Os catálogos de destino configurados com armazenamento predefinido não são suportados.
- Projeto Lakebase: O seu papel de Postgres requer permissões CAN MANAGE no projeto Lakebase. Os proprietários do projeto têm PODEM GERIR por defeito. Veja Gerir permissões de projeto.
- Tipos de dados: Veja Mapeamento de tipos de dados. Tipos sem equivalente direto em Delta são armazenados como STRING.
Criação da Lakebase CDF
Para começar, define a identidade da réplica completa nas tabelas que queres no feed (Passo 1), depois inicia o CDF na aplicação Lakebase (Passo 2). Os seus dados aparecem como tabelas Delta lb_<table_name>_history no catálogo e esquema do Unity Catalog que escolher.
Passo 1: Definir a identidade da réplica completa
Para que uma tabela do Lakebase participe no CDF, deve ter REPLICA IDENTITY FULL definido. Por defeito, o Postgres regista apenas a chave primária quando uma linha é atualizada ou eliminada. Configurar a identidade completa instrui o Postgres a registar tanto o estado da linha antes da alteração como depois da alteração no registo write-ahead, de que o CDF necessita para construir um histórico completo de alterações.
Pode executar estes comandos no Lakebase SQL Editor ou em qualquer cliente Postgres.
Tabela única
ALTER TABLE <table_name> REPLICA IDENTITY FULL;
Todas as tabelas existentes num esquema
Para definir a identidade de réplica em cada tabela existente num esquema (public neste exemplo), execute:
DO $$
DECLARE r record;
BEGIN
FOR r IN
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
LOOP
EXECUTE format(
'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
r.table_schema, r.table_name
);
END LOOP;
END $$;
Aplicação automática a tabelas futuras
Para que todas as tabelas recém-criadas recebam REPLICA IDENTITY FULLautomaticamente , instale um gatilho de evento Postgres. Executa-se após cada CREATE TABLE e define a identidade na nova tabela:
CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
obj record;
BEGIN
FOR obj IN
SELECT * FROM pg_event_trigger_ddl_commands()
WHERE command_tag = 'CREATE TABLE'
LOOP
EXECUTE format(
'ALTER TABLE %s REPLICA IDENTITY FULL;',
obj.object_identity
);
END LOOP;
END $$;
CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();
Combine o acionador de evento com o ciclo no separador anterior para abranger tanto as tabelas existentes como as tabelas futuras numa única configuração.
Verifique quais as tabelas com identidade de réplica definida
Para ver que tabelas num esquema têm identidade réplica configurada, execute:
SELECT n.nspname AS table_schema,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;
Só as linhas com replica_identity = 'full' estão prontas para CDF.
Passo 2: Iniciar o feed de alterações de dados
O Lakebase CDF é configurado ao nível do esquema. Uma vez iniciado, todas as tabelas atuais e futuras do esquema de origem são incluídas no feed.
- No teu espaço de trabalho Azure Databricks, abre Lakebase Postgres no comutador de aplicações (canto superior direito).
- Seleciona o teu projeto Lakebase e o ramo que queres usar (por exemplo, produção ou principal).
- Abra a visão geral do ramo e clique no separador Alterar Feed de Dados .
- Clique em Iniciar.
- No diálogo de configuração:
-
Base de dados: É predefinida como
databricks_postgres. - Esquema: Selecione o esquema Postgres de origem.
- Para o catálogo: Selecione o catálogo de destino do Unity Catalog.
- Esquema: Selecione o esquema do Catálogo Unity de destino.
-
Base de dados: É predefinida como
- Clica em Iniciar para iniciar a transmissão.
As tabelas aparecem no destino como lb_<table_name>_history. Para os encontrar, abra Catálogo na barra lateral, navegue até ao catálogo e esquema de destino e abra o separador Tabelas.
O separador Alterar Feed de Dados no Lakebase tem dois subseparadores:
- Esquemas: Lista cada esquema de origem, o seu catálogo de destino e esquema no Unity Catalog, e um estado.
-
Tabelas: Lista cada tabela de origem, a tabela
lb_<table_name>_historyde destino, o estado (StreamingouSnapshotting), o LSN consolidado (até que ponto o feed escreveu no Delta, apresentado como-enquanto ainda está no snapshot inicial) e a Última atualização (a última vez que a tabela recebeu alterações).
Também pode inspecionar o estado do feed do Postgres executando isto no Lakebase SQL Editor:
SELECT * FROM wal2delta.tables;
O resultado inclui table_oid, status (STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time em cada tabela.
Importante
O que é o wal2delta? O Lakebase CDF é alimentado pela extensão wal2delta Postgres, que corre dentro do cálculo Lakebase. Utiliza descodificação lógica para capturar alterações do write-ahead log (WAL) e grava-as em tabelas Delta no Unity Catalog.
Esquema da tabela de destino
O CDF escreve uma tabela Delta por cada tabela de origem, com o nome lb_<table_name>_history no catálogo e esquema de destino. Além das colunas de origem, cada linha contém estas colunas do sistema:
| Coluna | Tipo | Description |
|---|---|---|
_pg_change_type |
TEXTO | Tipo de operação: insert, delete, update_preimage, ou update_postimage. |
_pg_lsn |
BIGINT | Número de sequência do registo do Postgres. |
_pg_xid |
INTEGER | ID de Transação Postgres. |
_timestamp |
TIMESTAMP | Carimbo temporal em que a alteração foi processada (sem fuso horário). |
_sort_by |
BIGINT | Chave de ordenação monótona usada para ordenar todas as alterações. |
Padrões comuns de alteração
-
Captura instantânea inicial: Da primeira vez que o CDF é executado numa tabela Lakebase existente, cada uma das linhas existentes é registada com
_pg_change_type = 'insert'. -
Atualizações: Uma atualização produz duas linhas: uma com
_pg_change_type = 'update_preimage'(linha antiga) e outra com_pg_change_type = 'update_postimage'(linha nova). -
Apaga: Uma eliminação produz uma linha com
_pg_change_type = 'delete'.
Estes são os mesmos eventos de alteração que o Delta Change Data Feed, pelo que aplicam-se os mesmos padrões a jusante.
Comportamento operacional
-
Nomeação das colisões: Se duas tabelas de origem mapearem para o mesmo nome de destino (por exemplo,
sales.usersemarketing.usersambas mapearem paralb_users_history), CDF escreve a primeira paralb_users_historye auto-sufixa a segunda paralb_users_history_1. Podes renomear qualquer uma das tabelas de destino no Unity Catalog e o feed continua a funcionar. - Âmbito ao nível do esquema: Quando inicias o CDF num esquema Lakebase, todas as tabelas atuais e futuras desse esquema estão incluídas. As tabelas vazias são ignoradas — uma tabela deve ter pelo menos uma linha para aparecer no destino.
- Tabelas de origem eliminadas: Se deixares uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog é preservada.
Construir pipelines a jusante
O Lakebase CDF foi concebido para oleodutos a jusante que reagem a alterações operacionais. Os padrões abaixo mostram três formas de consumir a ração, ordenadas do mais simples ao mais flexível.
Exemplo de cenário. Uma aplicação de comércio eletrónico regista encomendas numa tabela Postgres orders , cada linha contendo um item_id e quantity. A equipa de logística precisa de níveis de inventário em tempo real. Com o CDF, todas as alterações feitas a orders são armazenadas na tabela Delta lb_orders_history no Unity Catalog. Os pipelines a jusante leem esse feed de alterações e atualizam uma inventory_levels tabela sempre que uma encomenda é feita, editada ou cancelada.
Calcular o inventário atual com uma vista materializada
O padrão mais simples é uma vista SQL materializada sobre a tabela de histórico. O MV atualiza-se incrementalmente à medida que surgem novos eventos de mudança, e os consumidores a jusante interrogam-no como qualquer outra tabela.
CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
item_id,
SUM(
CASE
-- New orders (and the "new half" of updates) decrement inventory
WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
-- Cancellations (and the "old half" of updates) restore inventory
WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
ELSE 0
END
) AS current_inventory,
MAX(_timestamp) AS last_transaction_ts,
MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;
As duas linhas geradas para cada atualização anulam-se mutuamente, exceto pela variação líquida, pelo que a soma acumulada se mantém correta à medida que as encomendas vão sendo editadas.
Alterações de fluxo com Spark Declarative Pipelines
Para uma arquitetura medallion estruturada, utilize Spark Declarative Pipelines (SDP) para declarar tabelas de bronze, prata e ouro. O SDP executa-os como um pipeline interligado, com os pontos de controlo e a gestão de dependências tratados automaticamente.
import dlt
from pyspark.sql import functions as F
@dlt.table
def inventory_adjustments():
return (
spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.select("item_id", "delta", "_timestamp")
)
@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
return (
spark.read.table("LIVE.inventory_adjustments")
.groupBy("item_id")
.agg(F.sum("delta").alias("on_hand"))
)
inventory_adjustments lê lb_orders_history de forma incremental com readStream e gera um delta por evento.
inventory_levels agrega por item_id para calcular o stock atual. A expectativa elimina linhas que poderiam levar a ações para negativo, sinalizando um bug a montante.
Para um guia completo de ponta a ponta, veja Tutorial: Construir um pipeline ETL usando captura de dados de alteração.
Processamento personalizado com Spark Structured Streaming
Quando precisar de controlo total — por exemplo, fusões personalizadas, efeitos secundários ou vários destinos — leia diretamente a tabela de histórico com o Spark Structured Streaming e use foreachBatch para escrever no destino pretendido.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def update_inventory(batch_df, batch_id):
deltas = (
batch_df
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.groupBy("item_id")
.agg(F.sum("delta").alias("delta"))
)
target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
(target.alias("t")
.merge(deltas.alias("s"), "t.item_id = s.item_id")
.whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
.whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
.execute())
(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.writeStream
.foreachBatch(update_inventory)
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
.start())
Cada microlote agrupa os eventos de alteração por item_id e combina os deltas finais em inventory_levels.
Incremental por conceção. Cada lb_<table_name>_history tabela é uma tabela Delta apenas para acrescentar. Cada alteração na origem é registada como uma nova linha, com _pg_change_type a assinalar a operação. Databricks SQL vistas materializadas, fluxos do Lakeflow Spark Declarative Pipelines e jobs do Spark Structured Streaming processam incrementalmente novas linhas a partir do registo de transações do Delta, pelo que os pipelines subsequentes só executam trabalho proporcional ao que foi alterado. Não precisa de ativar o Delta Change Data Feed na tabela de histórico porque a semântica de mudança já está codificada nos dados da linha.
Mapeamento de tipos de dados
O CDF suporta a maioria dos tipos padrão de primitivas PostgreSQL. Tipos sem equivalente direto em Delta são armazenados como STRING.
| Tipo PostgreSQL | tipo Azure Databricks Delta | Notes |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT, SMALLINT, BIGINT | INT, SMALLINT, BIGINT | |
| TEXTO, VARCHAR, CHAR | STRING | |
| JSONB | STRING | Armazenado como uma string JSON. |
| ENUM | STRING | Guardado como a etiqueta de enum. |
| NUMÉRICO / DECIMAL | DECIMAL ou STRING | Utiliza a precisão/escala da fonte sempre que possível. Realiza reescalonamento sem perdas para valores de precisão/escala incompatíveis. Volta para STRING quando a precisão excede 38 ou quando a precisão/escala não está definida (NUMERIC ilimitado). Todas as colunas NUMERIC/DECIMAL são anuláveis porque os valores NaN são mapeados para NULL. Ver tipos numéricos PostgreSQL. |
| DATE | DATE | |
| TIMESTAMP | TIMESTAMP_NTZ | |
| CARIMBO DE DATA/HORA COM FUSO HORÁRIO | TIMESTAMP | |
| FLUTUAR, DOBRAR | FLUTUAR, DOBRAR |
Tipos armazenados como STRING:
-
Geografia/Geometria (Pós-SIG): Tipos da extensão PostGIS (por exemplo,
geometry,geography). -
Vetor (pgvector): O tipo
vectorda extensão pgvector. -
Tipos compostos/estruturas: Tipos personalizados definidos com
CREATE TYPE ... AS (field_name type, ...). Estes são tipos em forma de linha com campos nomeados. -
Mapa: Tipos de chave-valor semelhantes a mapas, como hstore (da
hstoreextensão). O Postgres não tem um tipo de mapa incorporado.hstoreé a forma comum de armazenar pares-chave-valor numa coluna.
Gerir alterações de esquemas
-
Renomear uma tabela no Postgres (por exemplo,
ALTER TABLE users RENAME TO customers) permite que o feed continue. O nome da tabela Delta de destino não muda — mantém-selb_users_history. - Alterações no esquema (adicionar uma coluna, eliminar uma coluna ou alterar o tipo de dado de uma coluna) desencadeiam uma nova captura da tabela afetada. O CDF relê toda a tabela do Postgres e reescreve-a para a tabela Delta de destino.
Desativar o Lakebase CDF
Desativar o CDF interrompe o feed de todos os esquemas Lakebase do projeto.
- No teu espaço de trabalho Azure Databricks, abre Lakebase Postgres no comutador de aplicações (canto superior direito).
- Selecione o seu projeto Lakebase e a ramificação onde configurou o CDF.
- Abra Descrição geral da ramificação e clique no separador Change Data Feed.
- Clique em Desabilitar. No diálogo de confirmação, reveja o aviso de que as alterações deixarão de fluir para as tabelas Delta, depois clique novamente em Desativar para confirmar.
Desativar o CDF não reinicia o seu cálculo.
Advertência
Se voltar a ativar o CDF mais tarde, o sistema não efetua uma nova captura completa. Quaisquer alterações que tenham ocorrido enquanto o CDF estava desativado estão permanentemente ausentes das tabelas Delta de destino.
Limitações e resolução de problemas
Pode ver o estado por tabela (snapshoting, skip ou streaming) no separador Change Data Feed , ou executando isto no Lakebase:
SELECT * FROM wal2delta.tables;
Razões comuns pelas quais uma tabela não aparece no feed:
-
REPLICA IDENTITY FULLnão definida: ExecuteALTER TABLE <table_name> REPLICA IDENTITY FULL;para a tabela. Ver Passo 1: Defina a identidade da réplica completa. - Tabelas particionadas: Tabelas particionadas Lakebase não são suportadas. Um esquema que contém tabelas particionadas faz com que essas tabelas falhem.
- Mesas vazias: Uma tabela com zero linhas é ignorada até que exista pelo menos uma linha.
Passos seguintes
- Construir ETL incremental com Pipelines Declarativos Spark. Veja o Tutorial: Construa um pipeline ETL usando captura de dados de alterações para um guia completo.
- Consulte a camada bronze com Databricks SQL. Veja : Comece com data warehousing usando Databricks SQL.
- Histórico de auditoria com consultas de viagem no tempo nas tabelas Delta de destino.