Servir dados de lakehouse com tabelas sincronizadas

As tabelas sincronizadas permitem-lhe fornecer dados de lakehouse através do Lakebase Postgres. As tabelas do Unity Catalog sincronizam-se no Postgres para que as aplicações possam consultar diretamente os dados do lakehouse com baixa latência. Este processo é vulgarmente conhecido como ETL inverso. O Lakehouse está otimizado para análises e enriquecimento, enquanto o Lakebase foi concebido para cargas de trabalho operacionais que requerem consultas rápidas tipo lookup e consistência transacional.

Diagrama de arquitetura que mostra o fluxo de dados do lakehouse para a Lakebase e depois para aplicações

O que são tabelas sincronizadas?

As tabelas sincronizadas permitem-lhe fornecer dados de nível analítico do Unity Catalog através do Lakebase Postgres, tornando-os disponíveis para aplicações que necessitam de consultas de baixa latência e transações ACID completas. Eles fazem a ponte entre o armazenamento analítico e os sistemas operacionais, mantendo os seus dados prontos para serem usados em aplicações em tempo real.

Fontes suportadas

As tabelas sincronizadas suportam os seguintes tipos de fonte do Catálogo Unity:

  • Tabelas Delta geridas e externas
  • Tabelas Iceberg geridas pelo sistema e externas
  • Opiniões e pontos de vista materializados

Como funciona

Tabelas sincronizadas Databricks criam uma cópia gerida dos seus dados do Catálogo Unity no Lakebase. Quando crias uma tabela sincronizada, obtém:

  1. Uma tabela sincronizada no Unity Catalog que faz referência ao pipeline de sincronização
  2. Uma tabela Postgres no Lakebase (apenas leitura, consultável pelas suas aplicações)

Diagrama que mostra a relação de três tabelas em tabelas sincronizadas

Por exemplo, pode sincronizar tabelas de ouro, funcionalidades projetadas ou saídas de ML de analytics.gold.user_profiles numa nova tabela analytics.gold.user_profiles_synced sincronizada. No Postgres, o nome do esquema do Catálogo Unity torna-se o nome do esquema Postgres, pelo que isto aparece como gold.user_profiles_synced:

SELECT * FROM gold.user_profiles_synced WHERE user_id = 12345;

As aplicações ligam-se aos drivers Postgres padrão e consultam os dados sincronizados juntamente com o seu próprio estado operacional.

Advertência

Embora seja possível modificar uma tabela sincronizada diretamente no Postgres, o Azure Databricks recomenda estritamente executar apenas consultas de leitura para proteger a integridade dos dados com a fonte. Para operações suportadas em tabelas sincronizadas, veja Operações permitidas em tabelas sincronizadas no Postgres.

Os pipelines de sincronização usam os Lakeflow Spark Declarative Pipelines geridos para atualizar continuamente tanto a tabela sincronizada do Unity Catalog como a tabela Postgres com as alterações provenientes da tabela de origem. Cada sincronização pode usar até 16 ligações à sua base de dados Lakebase.

O Lakebase Postgres suporta até 1.000 ligações concorrentes com garantias transacionais, para que as aplicações possam ler dados enriquecidos enquanto também tratam de inserções, atualizações e eliminações na mesma base de dados.

Modos de sincronização

Escolha o modo de sincronização certo com base nas necessidades da sua aplicação:

Mode Descrição Quando utilizar Desempenho
Instantâneo Cópia única de todos os dados A fonte altera >10% das linhas por ciclo, ou a fonte não suporta CDF (views, tabelas Iceberg) 10 vezes mais eficiente se modificar >10% de dados fonte
Acionado Atualizações agendadas que correm sob demanda ou em intervalos As linhas de origem mudam numa cadência conhecida. Inserções, atualizações e eliminações são propagadas a cada atualização. Bom equilíbrio custo/atraso. É caro se funcionar <em intervalos de 5 minutos
Contínuo Streaming em tempo real com segundos de latência As alterações têm de aparecer no Lakebase quase em tempo real Menor atraso, maior custo. Intervalos mínimos de 15 segundos

Os modos Triggered e Continuous requerem que o Change Data Feed (CDF) esteja ativado na sua tabela de origem. Se o CDF não estiver ativado, vais ver um aviso na interface com o comando exato ALTER TABLE para executar. Para mais detalhes sobre o Feed de Dados de Alteração, consulte Usar o feed de dados de alterações do Delta Lake nos Databricks.

Observação

Fontes que não suportam CDF (como vistas, vistas materializadas e tabelas Iceberg) só podem ser sincronizadas em modo Snapshot . Para o modo Snapshot, a fonte deve suportar SELECT *.

Exemplos de casos de uso

Pode usar tabelas sincronizadas para casos de uso de serviço de dados como:

  • Motores de personalização que servem perfis de utilizador frescos às aplicações Databricks
  • Aplicações que servem previsões de modelos ou valores de atributos calculados na arquitetura de dados lakehouse
  • Dashboards voltados para o cliente que servem KPIs em tempo real
  • Serviços de deteção de fraude que atribuem pontuações de risco para ação imediata
  • Ferramentas de suporte que disponibilizam registos de clientes enriquecidos a partir de dados de lakehouse

Criar uma tabela sincronizada

Pré-requisitos

Precisas:

  • Um espaço de trabalho Databricks com o Lakebase ativado.
  • Um projeto Lakebase (ver Criar um projeto).
  • Uma tabela do Unity Catalog para sincronizar.
  • Permissões para criar tabelas sincronizadas. Precisas USE_SCHEMA e CREATE_TABLE em qualquer esquema que usares.

Para modos Triggered ou Contínuo , o Change Data Feed deve estar ativado na sua tabela de origem:

ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Para planeamento de capacidade e compatibilidade de tipos de dados, consulte Tipos de dados e compatibilidade e Planeamento de capacidade.

IU

  1. Vai a Catálogo na barra lateral do espaço de trabalho e seleciona a tabela do Catálogo Unity que queres sincronizar.

    Explorador de Catálogo mostrando uma tabela selecionada

  2. Clique em Criar>tabela sincronizada na vista de detalhes da tabela.

    Menu suspenso do botão Criar mostrando a opção Tabela Sincronizada

  3. No diálogo Criar tabela sincronizada :

    As listas de catálogo e de esquemas incluem apenas esquemas do Catálogo Unity onde o utilizador atual tem privilégios de USE_SCHEMA e CREATE_TABLE . Se não vires um esquema que esperas, confirma as tuas permissões com o administrador do catálogo.

    1. Nome da tabela: Introduza um nome para a sua tabela sincronizada (é criada no mesmo catálogo e esquema que a sua tabela de origem). Isto cria tanto uma tabela sincronizada do Unity Catalog como uma tabela Postgres que pode consultar.

    2. Tipo de base de dados: Escolha Lakebase Serverless (Autoscaling).

    3. Modo de sincronização: Escolha Snapshot, Triggered ou Contínuo conforme as suas necessidades (veja modos de sincronização acima).

    4. Configura as escolhas do teu projeto, branch e base de dados.

    5. Verifica se a chave primária está correta (normalmente detetada automaticamente).

      Importante

      As colunas na chave primária não podem ser nulas na tabela sincronizada. Linhas com nulos nas colunas primárias da chave são excluídas da sincronização.

    6. (Opcional) Se duas linhas puderem partilhar a mesma chave primária na tabela de origem, selecione uma chave de série temporal para configurar a deduplicação. Quando uma chave de série temporal é especificada, a tabela sincronizada contém apenas a linha com o valor da chave de série temporal mais recente para cada chave primária. Para o modo de falha sem chave de série temporal, veja Chaves duplicadas.

    Se escolheste o modo Triggered ou Continuous e ainda não ativaste o Change Data Feed, vais ver um aviso com o comando exato para executar. Para questões de compatibilidade de tipos de dados, consulte Tipos de dados e compatibilidade.

    Clique em Criar para criar a tabela sincronizada.

  4. Monitoriza a tabela sincronizada no Catálogo. O separador Visão Geral mostra o estado da sincronização, a configuração, o estado do pipeline e o carimbo temporal da última sincronização. Usa o Sync Now para atualização manual.

Python SDK

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.postgres import (
    SyncedTable,
    SyncedTableSyncedTableSpec,
    SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy,
)

w = WorkspaceClient()

synced_table = w.postgres.create_synced_table(
    synced_table=SyncedTable(spec=SyncedTableSyncedTableSpec(
        source_table_full_name="main.sales.orders",
        branch="projects/my-project/branches/production",
        primary_key_columns=["order_id"],
        scheduling_policy=SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT,
        postgres_database="mydb",
        create_database_objects_if_missing=True,
    )),
    synced_table_id="my-catalog.sales.orders",
).wait()

print(f"Synced table created: {synced_table.name}")

O synced_table_id utiliza o formato catalog.schema.table e torna-se o nome da tabela sincronizada no Unity Catalog. No Postgres, a tabela {table} é criada no esquema {schema}, dentro da base de dados que defines ( postgres_database aqui, mydb).

SDK de Java

import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.*;
import java.util.List;

WorkspaceClient w = new WorkspaceClient();

SyncedTable syncedTable = w.postgres().createSyncedTable(
    new CreateSyncedTableRequest()
        .setSyncedTableId("my-catalog.sales.orders")
        .setSyncedTable(new SyncedTable()
            .setSpec(new SyncedTableSyncedTableSpec()
                .setSourceTableFullName("main.sales.orders")
                .setBranch("projects/my-project/branches/production")
                .setPrimaryKeyColumns(List.of("order_id"))
                .setSchedulingPolicy(SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT)
                .setPostgresDatabase("mydb")
                .setCreateDatabaseObjectsIfMissing(true))))
    .waitForCompletion();

System.out.println("Synced table created: " + syncedTable.getName());

encaracolar

curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables?synced_table_id=my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "source_table_full_name": "main.sales.orders",
      "branch": "projects/my-project/branches/production",
      "primary_key_columns": ["order_id"],
      "scheduling_policy": "SNAPSHOT",
      "postgres_database": "mydb",
      "create_database_objects_if_missing": true
    }
  }'

Isto retorna a uma operação de longa duração. Sondagem o campo decifrado name até done: true. Ver Operações de longa duração. Para configuração de autenticação, veja Autenticação.

Agendar ou acionar sincronizações subsequentes

O instantâneo inicial é executado automaticamente no momento da criação. Para os modos Snapshot e Triggered , as sincronizações subsequentes devem ser disparadas explicitamente. O modo contínuo é autogestivo.

Tarefa de Sincronização de Tabela de Dados no Pipeline

A tarefa do pipeline de sincronização de tabelas da base de dados no Lakeflow Jobs executa o pipeline de uma tabela sincronizada como um passo do fluxo de trabalho. Configura o trabalho com um gatilho de atualização de tabela ou um agendamento.

Trigger nas atualizações da tabela de origem

O trabalho é acionado quando a tabela do Catálogo Unity de origem é atualizada. Com o modo Triggered , apenas novas alterações são aplicadas de forma incremental, proporcionando uma frescura quase em tempo real sem o custo sempre ativo do modo Contínuo.

  1. Na barra lateral, clique em Fluxos de Trabalho.
  2. Clica em Criar emprego ou abre um emprego existente.
  3. No separador Tarefas, clique + Adicionar outro tipo de tarefa.
  4. Em Ingestão e Transformação, selecione Pipeline de Sincronização de Tabela de Base de Dados.
  5. No campo Pipeline , selecione o pipeline associado à sua tabela sincronizada.
  6. Em Horários e Gatilhos, clique em Adicionar gatilho.
  7. Selecione a atualização da tabela como tipo de gatilho.
  8. Em Tabelas, selecione a tabela fonte do Catálogo Unity para monitorizar.
  9. Clique em Salvar.

Ativar por agendamento

Executa a sincronização a uma cadência fixa. Ideal para o modo Snapshot , onde uma atualização completa noturna ou semanal é tipicamente o padrão mais eficiente.

  1. Siga os passos 1–5 acima para adicionar uma tarefa de uma pipeline de sincronização de Tabela de Bases de Dados a um trabalho.
  2. Em Horários e Gatilhos, clique em Adicionar gatilho.
  3. Selecione Agendado como tipo de disparador.
  4. Define o teu cron schedule e fuso horário, depois clica em Guardar.

Verificar o estado da sincronização

Para verificar o estado atual e o último tempo de sincronização de uma tabela sincronizada:

IU

No Catálogo, navega até à tua tabela sincronizada e seleciona o separador Visão Geral. Mostra o estado atual da sincronização, o estado do pipeline e o carimbo temporal da última sincronização.

Python SDK

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

table = w.postgres.get_synced_table("synced_tables/my-catalog.sales.orders")
print(f"State: {table.status.detailed_state}")
print(f"Last sync: {table.status.last_sync_time}")
print(f"Message: {table.status.message}")

SDK de Java

import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.SyncedTable;

WorkspaceClient w = new WorkspaceClient();

SyncedTable table = w.postgres().getSyncedTable("synced_tables/my-catalog.sales.orders");
System.out.println("State: " + table.getStatus().getDetailedState());
System.out.println("Last sync: " + table.getStatus().getLastSyncTime());
System.out.println("Message: " + table.getStatus().getMessage());

encaracolar

curl "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}"

Tipos de dados e compatibilidade

Os tipos de dados do Unity Catalog são mapeados para tipos Postgres ao criar tabelas sincronizadas. Os tipos complexos (ARRAY, MAP, STRUCT) são armazenados como JSONB no Postgres.

Tipo de coluna de origem Tipo de coluna Postgres
BIGINT BIGINT
BINARY BYTEA
BOOLEANO BOOLEANO
DATE DATE
DECIMAL(p,s) NUMÉRICO
DUPLO Double Precision
FLOAT REAL
INT INTEIRO
INTERVALO INTERVALO
SMALLINT SMALLINT
STRING TEXTO
DATA E HORA CARIMBO DE DATA/HORA COM FUSO HORÁRIO
TIMESTAMP_NTZ MARCA TEMPORAL SEM ZONA HORÁRIA
TINYINT SMALLINT
ARRAY<tipoDeElemento> JSONB
MAPA<keyType,valueType> JSONB
STRUCT<nomeCampo:tipoCampo[, ...]> JSONB

Observação

GEOGRAFIA, GEOMETRIA, VARIANTES e tipos de OBJETO não são suportados.

Lidar com caracteres inválidos

Certos caracteres como bytes nulos (0x00) são permitidos nas colunas STRING, ARRAY, MAP ou STRUCT do Unity Catalog, mas não suportados em colunas Postgres TEXT ou JSONB. Isto pode causar falhas de sincronização com erros como:

ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
  • O primeiro erro ocorre quando um byte nulo aparece em uma coluna de cadeia de caracteres de nível superior, que mapeia diretamente para o Postgres TEXT.
  • O segundo erro ocorre quando um byte nulo aparece numa cadeia aninhada dentro de um tipo complexo (STRUCT, , ou ARRAY), que é serializado como MAPJSONB. Durante a serialização, todas as cadeias de caracteres são convertidas para Postgres TEXT, onde \u0000 não é permitido.

Soluções:

  • Sanitizar campos de strings: Remover caracteres não suportados antes de sincronizar. Para bytes nulos nas colunas STRING:

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table
    
  • Converter para BINARY: Para colunas STRING onde é necessário preservar bytes brutos, converta para o tipo BINARY.

Planeamento de capacidade

Ao planear a implementação das tabelas sincronizadas, considere estes requisitos de recursos:

  • Utilização da ligação: Cada tabela sincronizada utiliza até 16 ligações à sua base de dados Lakebase, que contam para o limite de ligação da instância.
  • Limites de tamanho: O limite total de tamanho lógico dos dados em todas as tabelas sincronizadas é de 8 TB. As tabelas individuais não têm limites, mas o Databricks recomenda não ultrapassar 1 TB para tabelas que requerem atualizações.
  • Tamanho de atualização completa: Ao ativar uma atualização completa, a versão antiga no Postgres não é eliminada até que a nova sincronização esteja concluída. Ambas as versões contam temporariamente para o limite de tamanho do banco de dados lógico durante a atualização.
  • Tabelas por fonte: Uma única tabela de origem pode ter até 20 tabelas sincronizadas.
  • Requisitos de nomenclatura: Os nomes de bases de dados, esquemas e tabelas podem conter apenas caracteres alfanuméricos e sublinhados ([A-Za-z0-9_]+).
  • Orientação para identificadores de origem: Evite usar letras maiúsculas ou caracteres especiais nos nomes das colunas ou tabelas da tabela do Catálogo Unity de origem. Se os mantiver, deve colocar esses identificadores entre aspas ao fazer referência aos mesmos no PostgreSQL.
  • Evolução do esquema: Apenas as alterações aditivas do esquema (como adicionar colunas) são suportadas para os modos Triggered e Contínuo.
  • Chaves duplicadas: Se duas linhas tiverem a mesma chave primária na tabela de origem, o pipeline de sincronização falha a menos que configure a deduplicação usando uma chave de série temporal.
  • Idempotência da API: As APIs de tabelas sincronizadas são idempotentes, por isso repita a tentativa em caso de erros transitórios para garantir a execução atempada das operações.
  • Taxa de atualização: Para o Lakebase Autoscaling, o pipeline de sincronização suporta gravações contínuas e acionadas a aproximadamente 150 linhas por segundo por Unidade de Capacidade (CU) e gravações de instantâneo até 2.000 linhas por segundo por Unidade de Capacidade (CU).

Operações permitidas em tabelas sincronizadas no Postgres

O Azure Databricks recomenda realizar apenas as seguintes operações no Postgres para tabelas sincronizadas, de modo a evitar sobrescrições acidentais ou inconsistências de dados:

  • Consultas somente leitura
  • Criação de índices
  • Soltar a tabela (para liberar espaço depois de remover a tabela sincronizada do Unity Catalog)

Embora seja possível modificar tabelas sincronizadas no Postgres de outras formas, isso interfere com o pipeline de sincronização.

Propriedade e permissões

Se você criar um novo banco de dados, esquema ou tabela do Postgres, a propriedade do Postgres será definida da seguinte maneira:

  • A propriedade é atribuída ao utilizador que cria a base de dados, esquema ou tabela, caso o seu login Azure Databricks exista como função no Postgres. Para adicionar uma função de identidade Azure Databricks no Postgres, veja funções Postgres.
  • Caso contrário, a propriedade será atribuída ao proprietário do objeto pai no Postgres (normalmente o databricks_superuser).

Gerenciar o acesso sincronizado à tabela

Depois de criada uma tabela sincronizada, o databricks_superuser pode ler uma tabela sincronizada a partir do Postgres. O databricks_superuser tem pg_read_all_data, que permite que essa função seja lida de todas as tabelas. Ele também tem o pg_write_all_data privilégio, que permite que essa função escreva em todas as tabelas. Isso significa que um databricks_superuser também pode gravar em uma tabela sincronizada no Postgres. O Lakebase suporta esse comportamento de escrita caso você precise fazer alterações urgentes na tabela de destino. No entanto, o Azure Databricks recomenda fazer correções na sua tabela de código-fonte em vez disso.

  • O databricks_superuser também pode conceder esses privilégios a outros usuários:

    GRANT USAGE ON SCHEMA synced_table_schema TO user;
    
    GRANT SELECT ON synced_table_name TO user;
    
  • O databricks_superuser pode revogar estes privilégios:

    REVOKE USAGE ON SCHEMA synced_table_schema FROM user;
    
    REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
    

Gerenciar operações de tabela sincronizada

O databricks_superuser pode gerenciar quais usuários estão autorizados a executar operações específicas em uma tabela sincronizada. As operações suportadas para tabelas sincronizadas são:

  • CREATE INDEX
  • ALTER INDEX
  • DROP INDEX
  • DROP TABLE

Todas as outras operações DDL são negadas para tabelas sincronizadas.

Para conceder esses privilégios a usuários adicionais, o databricks_superuser deve primeiro criar uma extensão em databricks_auth:

CREATE EXTENSION IF NOT EXISTS databricks_auth;

Em seguida, o databricks_superuser pode adicionar um usuário para gerenciar uma tabela sincronizada:

SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');

O databricks_superuser pode remover um usuário do gerenciamento de uma tabela sincronizada:

SELECT databricks_synced_table_remove_manager('[table]', '[user]');

O databricks_superuser pode ver todos os gerentes:

SELECT * FROM databricks_synced_table_managers;

Eliminar uma tabela sincronizada

Eliminar uma tabela sincronizada do Unity Catalog também elimina a tabela Postgres correspondente.

IU

No Catálogo, encontre a sua tabela sincronizada, clique no ícone do menu Kebab. e selecione Eliminar.

Python SDK

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

w.postgres.delete_synced_table("synced_tables/my-catalog.sales.orders").wait()

SDK de Java

import com.databricks.sdk.WorkspaceClient;

WorkspaceClient w = new WorkspaceClient();

w.postgres().deleteSyncedTable("synced_tables/my-catalog.sales.orders").waitForCompletion();

encaracolar

curl -X DELETE "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}"

Mais informações

Tarefa Descrição
Criar um projeto Criar um projeto Lakebase
Conecte-se ao seu banco de dados Aprenda as opções de ligação para Lakebase
Base de dados de registos no Unity Catalog Torne os seus dados do Lakebase visíveis no Unity Catalog para governação unificada e consultas entre fontes
Integração com o Catálogo Unity Compreender a governação e as permissões

Integração de catálogos

  • Duplicação de catálogo: Criar uma tabela sincronizada em um catálogo padrão direcionado a um banco de dados Postgres que também é registrado como um catálogo de banco de dados separado faz com que a tabela sincronizada apareça no Unity Catalog sob os catálogos padrão e de banco de dados.

Outras opções

Para sincronizar dados em sistemas não-Databricks, consulte as soluções Partner Connect reverse ETL como Census e Hightouch.