Armazenar as alterações do Postgres no lakehouse

Note

A funcionalidade Lakebase Change Data Feed está em pré-visualização pública.

Configura o Lakebase Change Data Feed (CDF) numa tabela Postgres e depois observa as alterações ao nível das linhas aparecerem na tabela Delta de destino.

Passos:Ativar a captura de alterações → ② Iniciar o feed → ③ Seguir uma linha até ao lakehouse → ④ Alterar a linha, vê-la fluir ao longo do processo

Note

Isto é um início rápido. Para documentação completa, consulte Lakebase Change Data Feed.

Antes de começar

  • Certifica-te de que concluíste Obter uma base de dados Postgres. Precisas de um projeto Lakebase com a playing_with_lakebase tabela de exemplo.
  • Um catálogo e esquema do Unity Catalog onde tens CREATE TABLE permissão.

Passo 1: Ativar a captura de alterações

O Postgres precisa de dados completos da linha no registo de escrita antecipada para que o CDF funcione. Definir a identidade da réplica como full faz com que o Postgres registe o estado antigo e o novo da linha em cada alteração.

No Editor de SQL do Lakebase, execute:

ALTER TABLE playing_with_lakebase REPLICA IDENTITY FULL;

Saiba mais: Defina a identidade de réplica em todas as tabelas de um esquema e aplique-a automaticamente a novas tabelas

Passo 2: Iniciar a alimentação

O Lakebase CDF é configurado ao nível do esquema. Todas as tabelas atuais e futuras no esquema de origem são incluídas automaticamente, por isso não escolhes tabelas individuais.

A partir do ramo de produção, abra o separador Alterar Feed de Dados e clique em Iniciar. Escolha public como o esquema de origem e, em seguida, escolha um catálogo e um esquema de destino no Catálogo Unity. O snapshot inicial começa imediatamente e lb_playing_with_lakebase_history aparece como uma tabela Delta no seu destino.

Inicia um diálogo com a seleção de origem e destino.

Saiba mais: Iniciar o fluxo de dados de alteração

Passo 3: Siga uma fila até à casa do lago

Escolhe uma linha de Lakebase. Dê uma vista de olhos na linha id=2:

SELECT * FROM playing_with_lakebase WHERE id = 2;

Agora encontre a mesma linha na tabela de história do Delta. Mude para um Databricks SQL warehouse ou notebook e execute:

SELECT * FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2;

Substitui <catalog> e <schema> pelo destino que escolheste no Passo 2. Verás uma linha id=2 com o mesmo name e value como no Lakebase, além de colunas extra. O snapshot inicial escreveu todas as linhas existentes no Delta como um evento insert, que é o que essa linha representa.

Essas colunas extra descrevem que tipo de evento cada linha representa (_pg_change_type), quando aconteceu (_timestamp), e a informação de encomenda do Postgres (_pg_lsn, _pg_xid).

Saiba mais: Esquema | Mapeamento de tipos de dados

Passo 4: Altere a linha, veja como a alteração se propaga

De volta ao Lakebase SQL Editor, atualize a linha id=2:

UPDATE playing_with_lakebase SET value = 55.5 WHERE id = 2;

Espere alguns segundos que a alteração apareça no feed e depois volte a consultar a tabela de histórico:

SELECT id, value, _pg_change_type, _timestamp
FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2
ORDER BY _pg_lsn DESC;

Tabela de histórico Delta que apresenta três linhas para id=2: update_preimage, update_postimage e insert

A linha id=2 aparece agora três vezes: a original insert, an update_preimage com o valor antigo, e an update_postimage com o valor novo. Cada alteração na linha torna-se uma nova linha de histórico, por isso tens sempre um registo de auditoria completo. As eliminações funcionam da mesma forma, adicionando uma linha com _pg_change_type = 'delete'.

Saiba mais: Padrões de alteração comuns | Criar pipelines a jusante

Passos seguintes