Criar um conector personalizado

Importante

Esse recurso está em Beta. Os administradores do workspace podem controlar o acesso a esse recurso na página Visualizações . Consulte Gerenciar visualizações do Azure Databricks.

Esta página mostra como criar um conector para uma origem que ainda não tem suporte no Lakeflow Connect. Primeiro, crie e teste seu conector localmente usando as ferramentas e modelos no repositório Lakeflow Community Connectors no GitHub. O repositório inclui ferramentas de desenvolvimento habilitadas para IA para ajudar em cada fase, incluindo pesquisa de origem, configuração de autenticação, implementação e teste.

Quando o conector personalizado estiver pronto para uso, experimente-o no workspace Azure Databricks e registre-o na comunidade abrindo uma solicitação de pull.

Para usar um conector de comunidade registrado, consulte Usar um conector de comunidade registrado.

Requisitos

Antes de começar, verifique se você tem estes itens:

  • Python 3.10 ou superior
  • Um workspace do Azure Databricks com o Unity Catalog habilitado
  • Credenciais de API para a origem à qual você deseja se conectar
  • Git instalado localmente

Configurar o repositório

Clone o repositório Lakeflow Community Connectors e instale as dependências de desenvolvimento.

  1. Por favor, clone o repositório:

    git clone https://github.com/databrickslabs/lakeflow-community-connectors.git
    cd lakeflow-community-connectors
    
  2. Crie um ambiente virtual e instale as dependências:

    python -m venv .venv
    source .venv/bin/activate
    pip install -e ".[dev]"
    
  3. Copie um diretório do conector existente (por exemplo, connectors/stripe/) como ponto de partida para o novo conector:

    cp -r connectors/stripe connectors/<your-source>
    

Implementar a LakeflowConnect interface

Cada conector de comunidade implementa a LakeflowConnect interface, que define como seu conector se autentica, descobre tabelas, retorna esquemas e lê dados.

class LakeflowConnect:
    def __init__(self, options: dict[str, str]) -> None:
        """Initialize with connection parameters"""

    def list_tables(self) -> list[str]:
        """Return names of all tables supported by this connector."""

    def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
        """Return the Spark schema for a table."""

    def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
        """Return metadata: primary_keys, cursor_field, ingestion_type
        (snapshot|cdc|cdc_with_deletes|append)."""

    def read_table(self, table_name: str, start_offset: dict,
                   table_options: dict[str, str]) -> (Iterator[dict], dict):
        """Yield records as JSON dicts and return the next offset
        for incremental reads."""

    def read_table_deletes(self, table_name: str, start_offset: dict,
                           table_options: dict[str, str]) -> (Iterator[dict], dict):
        """Optional: Only required if ingestion_type is 'cdc_with_deletes'."""

Descrições do método

Método Description
__init__ Recebe os parâmetros de conexão como um dicionário e inicializa o cliente de API para sua origem.
list_tables Retorna os nomes de todas as tabelas (ou pontos de extremidade de API) que o conector expõe. Azure Databricks usa essa lista para preencher a interface de seleção de tabelas.
get_table_schema Retorna um Spark StructType que descreve o esquema da tabela fornecida. Chamado antes da primeira execução de pipeline e em cada execução quando a evolução do esquema está habilitada.
read_table_metadata Retorna um dicionário com primary_keys, cursor_fielde ingestion_type. O ingestion_type deve ser um de snapshot, cdc, cdc_with_deletesou append.
read_table Gera registros como Python dicionários e retorna o próximo deslocamento para leituras incrementais. Na primeira execução, start_offset está vazio. Em execuções subsequentes, ele contém o deslocamento retornado pela execução anterior.
read_table_deletes Opcional. Implemente esse método somente caso ingestion_type for cdc_with_deletes. Gera chaves de registro excluídas e retorna o próximo deslocamento.

Desenvolver seu conector

Siga estas etapas para criar e validar um novo conector:

  1. Pesquise a API de origem: estudo as especificações da API da origem, os mecanismos de autenticação, os limites de taxa e os esquemas de dados disponíveis. Identifique quais tabelas ou endpoints expor.

  2. Configurar a autenticação: gerar a especificação de conexão, configurar credenciais para a origem e verificar a conectividade do seu ambiente de desenvolvimento.

  3. Implemente o conector: codificar todos os métodos de interface necessários LakeflowConnect para se conectar à API de origem e retornar dados no formato esperado.

  4. Testar e iterar: execute os pacotes de teste padrão em um sistema de origem real e corrija quaisquer problemas. Consulte Testar seu conector para obter detalhes.

  5. Documente o conector: escreva um arquivo YAML de especificação do conector voltado para o usuário README.md e gere o arquivo YAML de especificação do conector que descreve os parâmetros configuráveis do conector.

  6. Crie o artefato de implantação: execute o script de construção para produzir o artefato de arquivo único que pode ser implantado em um ambiente de trabalho.

Testar o conector

O repositório fornece várias abordagens de teste:

Pacote de teste genérico (obrigatório)

Conecta-se a uma fonte real usando suas credenciais fornecidas para verificar a funcionalidade de ponta a ponta, incluindo autenticação, descoberta de esquema e leituras de dados.

python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json

Executa ciclos de gravação-leitura-verificação para validar leituras e exclusões incrementais. Isso confirma que o rastreamento de offset e a lógica CDC funcionam corretamente.

python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json

Testes de unidade

Escreva testes de unidade para qualquer lógica personalizada complexa em seu conector, como tratamento de paginação, coerção de tipo ou recuperação de erro.

Construir o artefato de implantação

Depois que o conector passar nas suítes de teste, execute o script de integração para gerar um artefato único para implantação. O pipeline usa esse arquivo em runtime em vez do repositório completo.

python tools/scripts/merge_python_source.py --connector <your-source>

Isso produz um arquivo Python independente em dist/<your-source>/ que inclui todo o código do conector e dependências.

Criar um pipeline de ingestão

Para testar o conector:

  1. Na barra lateral do workspace Azure Databricks, clique em +Novo>Adicionar ou carregar dados e selecione + Adicionar Conector da Comunidade em Conectores da Comunidade.

  2. Para Nome da fonte, insira o nome do seu conector.

  3. Para URL do repositório do GitHub, insira o endereço do repositório do GitHub que contém o código-fonte do seu conector.

  4. Clique em Adicionar Conector.

  5. Clique em + Criar conexão ou selecione uma conexão existente e clique em Avançar.

  6. Para nome do pipeline, insira um nome para o pipeline.

  7. Para local do log de eventos, insira um nome de catálogo e um nome de esquema. Azure Databricks armazena o log de eventos do pipeline aqui. As tabelas ingeridas também são escritas aqui por padrão.

  8. Para caminho raiz, insira o caminho do workspace (por exemplo, /Workspace/Users/<your-email>/connectors). Azure Databricks clona e armazena o código-fonte do conector aqui.

  9. Clique em Criar pipeline.

  10. No editor de pipeline, abra ingest.py e modifique o campo objetos para incluir as tabelas que você deseja ingerir. Por exemplo:

    from databricks.labs.community_connector.pipeline import ingest
    
    pipeline_spec = {
        "connection_name": "my_connector_connection",  # Required: UC connection name
        "objects": [
            {"table": {"source_table": "my_table"}},
        ],
    }
    
    ingest(spark, pipeline_spec)
    
  11. Execute o pipeline manualmente ou agende-o.

Opções de configuração de pipeline

Você pode configurar as seguintes opções em ingest.py:

Opção Description
connection_name Obrigatório O nome da conexão que armazena credenciais de autenticação para a origem.
objects Obrigatório Uma lista de tabelas a serem ingeridas. Cada entrada tem o formato {"table": {"source_table": "..."}}. Você também pode especificar um opcional destination_table dentro do table objeto.
destination_catalog O catálogo em que as tabelas ingeridas são gravadas. O padrão é o catálogo definido durante a criação do pipeline.
destination_schema O esquema em que as tabelas ingeridas são gravadas. Pode ser definido como o esquema estabelecido durante a criação do pipeline.
scd_type A estratégia de dimensão de alteração lenta: SCD_TYPE_1, SCD_TYPE_2, ou APPEND_ONLY. Usa SCD_TYPE_1 como padrão.
primary_keys Substitua as chaves primárias padrão de uma tabela. Forneça uma lista de nomes de coluna.

Registre seu conector

Depois de compilar e testar o conector, abra uma solicitação pull no repositório Lakeflow Community Connectors para disponibilizá-lo para a comunidade.