Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Ao lidar com grandes quantidades de dados, você precisa de um pipeline que possa processar apenas os registros novos e alterados em vez de reprocessar todo o conjunto de dados. Isso é chamado de ETL incremental. No Databricks SQL, você pode criar pipelines ETL incrementais usando tabelas de streaming e exibições materializadas, sem escrever código de procedimento ou agendar atualizações manuais.
Este tutorial orienta você por um padrão comum: acompanhar as alterações do produto ao longo do tempo. Você cria uma tabela de origem, captura eventos de alteração, cria uma tabela de dimensão que preserva o histórico completo de cada produto e adiciona uma camada de relatório agregada na parte superior.
O principal recurso neste tutorial é AUTO CDC. Em um armazém tradicional, você escreveria instruções complexas MERGE INTO para reconciliar eventos de inserção, atualização e exclusão em uma tabela de destino. Essa abordagem é propensa a erros, especialmente quando os eventos chegam fora de ordem.
AUTO CDC trata disso para você. Você declara a chave de negócios, a coluna de sequenciamento e se deseja scd tipo 1 (somente valor mais recente) ou SCD Tipo 2 (histórico completo) e Azure Databricks aplica a lógica de mesclagem correta automaticamente. Para obter uma visão geral do CDC, consulte as APIs AUTO CDC: Simplificar a captura de dados de alteração com pipelines.
Ao final deste tutorial, você terá:
- Criou uma tabela de origem que acompanha as alterações com o Feed de Dados de Alterações.
- Inspecionou os dados brutos de alteração para entender o fluxo de eventos CDC.
- Foi usado
AUTO CDCpara criar uma tabela de dimensões SCD Tipo 2 a partir desses eventos. - Eventos de exclusão processados incrementalmente por meio do pipeline.
- Criou uma visão materializada que mantém de forma incremental um relatório agregado.
- Configurado
SCHEDULE REFRESH EVERY 1 DAYpara que as alterações sejam propagadas automaticamente por meio do pipeline.
Requisitos
Para concluir este tutorial, você deve atender aos seguintes requisitos:
- Um workspace do Azure Databricks com Unity Catalog habilitado.
- Um sql warehouse (sem servidor ou profissional).
- Tenha permissão para criar um recurso de computação ou acesso a um recurso de computação.
- Computação sem servidor habilitada para sua conta. Consulte Recursos com disponibilidade regional limitada.
Etapa 1: Configurar seu catálogo e esquema
Abra o editor de SQL do Databricks e defina seu catálogo e esquema de trabalho. Você deve ter permissão para USE o catálogo e o esquema selecionado:
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Etapa 2: Criar uma tabela de origem e carregar dados
Crie uma tabela products com Use o feed de dados de alterações do Delta Lake no CDF (Azure Databricks) habilitado. CDF é um recurso do Delta Lake que registra cada inserção, atualização e exclusão como um log de alterações que pode ser consultado. Isso é semelhante a um fluxo CDC de um sistema de origem transacional, exceto que as alterações são capturadas diretamente na tabela Delta, em vez de em um registro externo. Você utiliza o CDF aqui para gerar os eventos de alteração que serão consumidos pelo pipeline subsequente.
Crie a tabela e carregue os registros iniciais:
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Simule alterações upstream, incluindo novos produtos, uma movimentação de armazém e uma reatribuição de categoria:
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Etapa 3: Consultar o feed de dados de alteração
Antes de criar o pipeline downstream, é útil examinar os eventos de alteração brutos para que você possa entender o que AUTO CDC irá processar. A table_changes() função lê o log do CDF e retorna todas as operações capturadas junto com colunas de metadados:
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Por exemplo, o Spoon tem três eventos: um insert (Seattle), um update_preimage (Seattle) e um update_postimage (Los Angeles).
Observe que uma única alteração lógica (por exemplo, mover o Spoon para um warehouse diferente) produz vários eventos: uma pré-imagem e uma postimagem. Em um armazém tradicional, você escreveria uma MERGE instrução para reconciliar todos esses eventos em uma tabela de destino, manipulando inserções, atualizações e exclusões com lógica separada e certificando-se de que os eventos sejam aplicados na ordem correta. Essa é exatamente a complexidade que AUTO CDC elimina na próxima etapa.
Etapa 4: Criar uma dimensão SCD Tipo 2 com AUTO CDC
Importante
AUTO CDC está em Beta. Requer o Databricks Runtime 17.3 ou superior.
Uma tabela de streaming processa dados incrementalmente. Em cada atualização, ela lê apenas as novas linhas desde a última execução, portanto, ela não precisa reprocessar o conjunto de dados completo. Isso o torna adequado para fontes de alto volume ou de alteração frequente.
AUTO CDC adiciona o processamento de captura de dados de alteração na parte superior de uma tabela de streaming. Em vez de escrever uma instrução MERGE INTO que manipula manualmente inserções, atualizações e exclusões, você declara a chave de negócios e a coluna de sequenciamento e permite que Azure Databricks aplique a lógica correta.
AUTO CDC também manipula eventos fora de ordem automaticamente, o que é um problema comum ao usar MERGE INTO para lidar com eventos que chegam de sistemas distribuídos ou cargas em lote com carimbos de data/hora sobrepostos.
A instrução a seguir cria uma tabela SCD Tipo 2 que preserva o histórico de versão completo de cada produto. Cada versão obtém __START_AT e __END_AT carimbos de data/hora. Um NULL in __END_AT marca a versão atual.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: atualiza a tabela diariamente. -
FLOW AUTO CDC: declara este como um fluxo CDC. Azure Databricks aplica a inserção, atualização e exclusão de semântica automaticamente. -
KEYS (product_id): a chave de negócio. Eventos com a mesma chave são mesclados em linhas versionadas. -
APPLY AS DELETE WHEN _change_type = 'delete': fecha a versão atual quando um evento de exclusão chega. Isso permite definir a condição que identifica um evento de exclusão. -
SEQUENCE BY _commit_timestamp: estabelece a ordenação de eventos. Lida com as chegadas fora de ordem corretamente. -
STORED AS SCD TYPE 2: mantém o histórico completo.AUTO CDCdá suporte ao SCD Tipo 1 e SCD Tipo 2.
Consulte a tabela de dimensões:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Colher: duas versões. Seattle (fechado,
__END_ATset) e Los Angeles (atual,__END_AT = NULL). - Bifurcação: duas versões. Categoria de cutelaria (fechada) e Categoria de jantar (atual).
- Guardanapo e Porta-Copo: uma versão cada (recém-inseridos,
__END_AT = NULL). - Todos os outros produtos: uma versão cada (
__END_AT = NULL).
Etapa 5: Processar exclusões por meio do pipeline
Agora simule dois produtos descontinuados excluindo-os da tabela de origem:
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Esses eventos de exclusão são gravados no log do CDF, mas a tabela de streaming ainda não os viu. Atualize a tabela de streaming para processar os novos eventos:
REFRESH STREAMING TABLE products_history;
Consulte a tabela de dimensões para verificar se as exclusões foram aplicadas:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Bowl e Glass estão agora encerrados com o conjunto __END_AT, marcando-os como descontinuados. Todos os outros produtos atuais permanecem inalterados. A tabela de streaming só processou os novos eventos de exclusão sem reprocessar as inserções e atualizações da atualização anterior.
Etapa 6: Criar uma visão materializada agregada
Agora que você tem uma tabela de dimensões que permanece atual com as alterações de origem, você pode adicionar uma camada de relatório na parte superior.
Uma exibição materializada armazena os resultados da consulta pré-computada como uma tabela física. Ao contrário de uma visão regular, que reexecuta a consulta sempre que você lê a partir dela, uma visão materializada persiste os resultados e apenas recalcula as linhas afetadas por alterações upstream em cada atualização. Isso o torna adequado para dashboards e relatórios em que o desempenho da consulta importa.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY significa que essa visualização é atualizada em um agendamento diário. Combinado com o mesmo agendamento na tabela de streaming, agora você tem um pipeline de três estágios em que as alterações na tabela original propagam-se em cascata, atingindo a dimensão e a agregação em cada ciclo de atualização. Não há nenhuma atualização manual a ser executada.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Etapa 7: Verificar a cascata de ponta a ponta
Para verificar a cascata completa do pipeline, faça uma alteração na tabela de origem:
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
The Knife se muda de Denver para Seattle. Essa única alteração de DML dispara a cascata completa do pipeline, demonstrando como os três estágios funcionam juntos:
-
productsregistra o evento de alteração por meio do CDF. -
products_historyprocessa o evento e adiciona uma nova versão para o Knife. -
products_by_categoryrecomputa apenas a linha Cutlery afetada.
Verificar:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Limpeza
Para limpar os recursos criados por este tutorial, use o seguinte SQL:
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;