Armazene alterações do Postgres no Lakehouse

Note

O recurso de Feed de Dados de Alteração do Lakebase está em Visualização Pública.

Configure o Feed de Dados de Alterações (CDF) do Lakebase em uma tabela Postgres e veja as alterações em nível de linha aparecerem na tabela Delta de destino.

Etapas:Habilitar a captura de alterações → ② Iniciar o feed → ③ Acompanhe uma linha até o lakehouse → ④ Altere a linha e veja-a fluir pelo processo

Note

Este é um início rápido. Para obter a documentação completa, consulte Lakebase Change Data Feed.

Antes de começar

  • Verifique se você concluiu a obtenção de um banco de dados Postgres. Você precisa de um projeto Lakebase com a tabela de exemplo playing_with_lakebase.
  • Um catálogo e um esquema no Unity Catalog no qual você tem permissão CREATE TABLE.

Etapa 1: Habilitar a captura de alterações

O Postgres precisa dos dados completos da linha no write-ahead log para que o CDF funcione. Definir a identidade de réplica como full faz com que o Postgres registre os estados antigo e novo da linha para cada alteração.

No Editor de SQL do Lakebase, execute:

ALTER TABLE playing_with_lakebase REPLICA IDENTITY FULL;

Saiba mais: Definir a identidade da réplica em todas as tabelas em um esquema e aplicá-la automaticamente a novas tabelas

Etapa 2: Iniciar o feed

O CDF do Lakebase está configurado no nível do esquema. Todas as tabelas atuais e futuras no esquema de origem são incluídas automaticamente, para que você não escolha tabelas individuais.

No branch de produção, abra a guia Alterar Feed de Dados e clique em Iniciar. Escolha public como esquema de origem e depois selecione um catálogo e um esquema de destino no Unity Catalog. O snapshot inicial começa imediatamente, e lb_playing_with_lakebase_history aparece como uma tabela Delta no destino.

Iniciar a caixa de diálogo com a seleção de origem e de destino.

Saiba mais: Iniciar o feed de dados de alterações

Etapa 3: Siga uma linha até a casa do lago

Escolha uma linha do Lakebase. Dê uma olhada na linha id=2:

SELECT * FROM playing_with_lakebase WHERE id = 2;

Agora, encontre a mesma linha na tabela de histórico do Delta. Mude para um Databricks SQL Warehouse ou um notebook e execute:

SELECT * FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2;

Substitua <catalog> e <schema> pelo destino escolhido na Etapa 2. Você verá linha id=2 com o mesmo name e value como no Lakebase, além de colunas extras. O snapshot inicial gravou todas as linhas existentes no Delta como um evento insert, pois é isso que essa linha representa.

Essas colunas extras descrevem que tipo de evento cada linha representa (_pg_change_type), quando aconteceu (_timestamp) e as informações de ordenação do Postgres (_pg_lsn, _pg_xid).

Saiba mais:Mapeamento de tipo de dados |

Etapa 4: Mude a linha e veja-a fluir

De volta ao Editor de SQL do Lakebase, atualize a linha id=2:

UPDATE playing_with_lakebase SET value = 55.5 WHERE id = 2;

Aguarde alguns segundos para que a alteração apareça no feed e, em seguida, consulte novamente a tabela de histórico:

SELECT id, value, _pg_change_type, _timestamp
FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2
ORDER BY _pg_lsn DESC;

Tabela de histórico do Delta mostrando três linhas para id=2: update_preimage, update_postimage e insert

A id=2 linha agora aparece três vezes: a original insert, uma update_preimage com o valor antigo e uma update_postimage com o novo valor. Cada alteração na linha se torna uma nova linha de histórico, portanto, você sempre tem uma trilha de auditoria completa. As exclusões funcionam da mesma maneira, acrescentando uma linha com _pg_change_type = 'delete'.

Saiba mais: Padrões comuns de alteração | Criar pipelines downstream

Próximas Etapas