Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em Beta. Os administradores do workspace podem controlar o acesso a esse recurso na página Visualizações . Consulte Gerenciar visualizações do Azure Databricks.
Esta página mostra como criar um conector para uma origem que ainda não tem suporte no Lakeflow Connect. Primeiro, crie e teste seu conector localmente usando as ferramentas e modelos no repositório Lakeflow Community Connectors no GitHub. O repositório inclui ferramentas de desenvolvimento habilitadas para IA para ajudar em cada fase, incluindo pesquisa de origem, configuração de autenticação, implementação e teste.
Quando o conector personalizado estiver pronto para uso, experimente-o no workspace Azure Databricks e registre-o na comunidade abrindo uma solicitação de pull.
Para usar um conector de comunidade registrado, consulte Usar um conector de comunidade registrado.
Requisitos
Antes de começar, verifique se você tem estes itens:
- Python 3.10 ou superior
- Um workspace do Azure Databricks com o Unity Catalog habilitado
- Credenciais de API para a origem à qual você deseja se conectar
- Git instalado localmente
Configurar o repositório
Clone o repositório Lakeflow Community Connectors e instale as dependências de desenvolvimento.
Por favor, clone o repositório:
git clone https://github.com/databrickslabs/lakeflow-community-connectors.git cd lakeflow-community-connectorsCrie um ambiente virtual e instale as dependências:
python -m venv .venv source .venv/bin/activate pip install -e ".[dev]"Copie um diretório do conector existente (por exemplo,
connectors/stripe/) como ponto de partida para o novo conector:cp -r connectors/stripe connectors/<your-source>
Implementar a LakeflowConnect interface
Cada conector de comunidade implementa a LakeflowConnect interface, que define como seu conector se autentica, descobre tabelas, retorna esquemas e lê dados.
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""
def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""
def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""
def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""
def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""
def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""
Descrições do método
| Método | Description |
|---|---|
__init__ |
Recebe os parâmetros de conexão como um dicionário e inicializa o cliente de API para sua origem. |
list_tables |
Retorna os nomes de todas as tabelas (ou pontos de extremidade de API) que o conector expõe. Azure Databricks usa essa lista para preencher a interface de seleção de tabelas. |
get_table_schema |
Retorna um Spark StructType que descreve o esquema da tabela fornecida. Chamado antes da primeira execução de pipeline e em cada execução quando a evolução do esquema está habilitada. |
read_table_metadata |
Retorna um dicionário com primary_keys, cursor_fielde ingestion_type. O ingestion_type deve ser um de snapshot, cdc, cdc_with_deletesou append. |
read_table |
Gera registros como Python dicionários e retorna o próximo deslocamento para leituras incrementais. Na primeira execução, start_offset está vazio. Em execuções subsequentes, ele contém o deslocamento retornado pela execução anterior. |
read_table_deletes |
Opcional. Implemente esse método somente caso ingestion_type for cdc_with_deletes. Gera chaves de registro excluídas e retorna o próximo deslocamento. |
Desenvolver seu conector
Siga estas etapas para criar e validar um novo conector:
Pesquise a API de origem: estudo as especificações da API da origem, os mecanismos de autenticação, os limites de taxa e os esquemas de dados disponíveis. Identifique quais tabelas ou endpoints expor.
Configurar a autenticação: gerar a especificação de conexão, configurar credenciais para a origem e verificar a conectividade do seu ambiente de desenvolvimento.
Implemente o conector: codificar todos os métodos de interface necessários
LakeflowConnectpara se conectar à API de origem e retornar dados no formato esperado.Testar e iterar: execute os pacotes de teste padrão em um sistema de origem real e corrija quaisquer problemas. Consulte Testar seu conector para obter detalhes.
Documente o conector: escreva um arquivo YAML de especificação do conector voltado para o usuário
README.mde gere o arquivo YAML de especificação do conector que descreve os parâmetros configuráveis do conector.Crie o artefato de implantação: execute o script de construção para produzir o artefato de arquivo único que pode ser implantado em um ambiente de trabalho.
Testar o conector
O repositório fornece várias abordagens de teste:
Pacote de teste genérico (obrigatório)
Conecta-se a uma fonte real usando suas credenciais fornecidas para verificar a funcionalidade de ponta a ponta, incluindo autenticação, descoberta de esquema e leituras de dados.
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json
Teste de write-back (recomendado)
Executa ciclos de gravação-leitura-verificação para validar leituras e exclusões incrementais. Isso confirma que o rastreamento de offset e a lógica CDC funcionam corretamente.
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json
Testes de unidade
Escreva testes de unidade para qualquer lógica personalizada complexa em seu conector, como tratamento de paginação, coerção de tipo ou recuperação de erro.
Construir o artefato de implantação
Depois que o conector passar nas suítes de teste, execute o script de integração para gerar um artefato único para implantação. O pipeline usa esse arquivo em runtime em vez do repositório completo.
python tools/scripts/merge_python_source.py --connector <your-source>
Isso produz um arquivo Python independente em dist/<your-source>/ que inclui todo o código do conector e dependências.
Criar um pipeline de ingestão
Para testar o conector:
Na barra lateral do workspace Azure Databricks, clique em +Novo>Adicionar ou carregar dados e selecione + Adicionar Conector da Comunidade em Conectores da Comunidade.
Para Nome da fonte, insira o nome do seu conector.
Para URL do repositório do GitHub, insira o endereço do repositório do GitHub que contém o código-fonte do seu conector.
Clique em Adicionar Conector.
Clique em + Criar conexão ou selecione uma conexão existente e clique em Avançar.
Para nome do pipeline, insira um nome para o pipeline.
Para local do log de eventos, insira um nome de catálogo e um nome de esquema. Azure Databricks armazena o log de eventos do pipeline aqui. As tabelas ingeridas também são escritas aqui por padrão.
Para caminho raiz, insira o caminho do workspace (por exemplo,
/Workspace/Users/<your-email>/connectors). Azure Databricks clona e armazena o código-fonte do conector aqui.Clique em Criar pipeline.
No editor de pipeline, abra
ingest.pye modifique o campo objetos para incluir as tabelas que você deseja ingerir. Por exemplo:from databricks.labs.community_connector.pipeline import ingest pipeline_spec = { "connection_name": "my_connector_connection", # Required: UC connection name "objects": [ {"table": {"source_table": "my_table"}}, ], } ingest(spark, pipeline_spec)Execute o pipeline manualmente ou agende-o.
Opções de configuração de pipeline
Você pode configurar as seguintes opções em ingest.py:
| Opção | Description |
|---|---|
connection_name |
Obrigatório O nome da conexão que armazena credenciais de autenticação para a origem. |
objects |
Obrigatório Uma lista de tabelas a serem ingeridas. Cada entrada tem o formato {"table": {"source_table": "..."}}. Você também pode especificar um opcional destination_table dentro do table objeto. |
destination_catalog |
O catálogo em que as tabelas ingeridas são gravadas. O padrão é o catálogo definido durante a criação do pipeline. |
destination_schema |
O esquema em que as tabelas ingeridas são gravadas. Pode ser definido como o esquema estabelecido durante a criação do pipeline. |
scd_type |
A estratégia de dimensão de alteração lenta: SCD_TYPE_1, SCD_TYPE_2, ou APPEND_ONLY. Usa SCD_TYPE_1 como padrão. |
primary_keys |
Substitua as chaves primárias padrão de uma tabela. Forneça uma lista de nomes de coluna. |
Registre seu conector
Depois de compilar e testar o conector, abra uma solicitação pull no repositório Lakeflow Community Connectors para disponibilizá-lo para a comunidade.