Erstellen eines benutzerdefinierten Connectors

Von Bedeutung

Dieses Feature befindet sich in der Betaversion. Arbeitsbereichsadministratoren können den Zugriff auf dieses Feature über die Vorschauseite steuern. Siehe Manage Azure Databricks Previews.

Diese Seite zeigt, wie Sie einen Connector für eine Quelle erstellen, die in Lakeflow Connect noch nicht unterstützt wird. Erstellen und testen Sie zuerst Ihren Connector lokal mithilfe der Tools und Vorlagen im Lakeflow Community Connectors-Repository auf GitHub. Das Repository enthält KI-basierte Entwicklungstools, die bei jeder Phase unterstützt werden, einschließlich Quellforschung, Authentifizierungseinrichtung, Implementierung und Tests.

Wenn Ihr benutzerdefinierter Connector einsatzbereit ist, probieren Sie ihn in Ihrem Azure Databricks Arbeitsbereich aus, und registrieren Sie ihn dann bei der Community, indem Sie eine Pullanforderung öffnen.

Informationen zum Verwenden eines registrierten Community-Connectors finden Sie unter Verwenden eines registrierten Community-Connectors.

Anforderungen

Bevor Sie beginnen, stellen Sie sicher, dass Sie folgendes haben:

  • Python 3.10 oder höher
  • Ein Azure Databricks Arbeitsbereich mit aktiviertem Unity-Katalog
  • API-Anmeldeinformationen für die Quelle, mit der Sie eine Verbindung herstellen möchten
  • Git lokal installiert

Einrichten des Repositorys

Klonen Sie das Lakeflow Community Connectors-Repository , und installieren Sie die Entwicklungsabhängigkeiten.

  1. Klonen Sie das Repository:

    git clone https://github.com/databrickslabs/lakeflow-community-connectors.git
    cd lakeflow-community-connectors
    
  2. Erstellen Sie eine virtuelle Umgebung, und installieren Sie Abhängigkeiten:

    python -m venv .venv
    source .venv/bin/activate
    pip install -e ".[dev]"
    
  3. Kopieren Sie ein vorhandenes Connector-Verzeichnis (z. B. connectors/stripe/) als Ausgangspunkt für Ihren neuen Connector.

    cp -r connectors/stripe connectors/<your-source>
    

Implementieren der LakeflowConnect Schnittstelle

Jeder Communityconnector implementiert die LakeflowConnect Schnittstelle, die definiert, wie Ihr Connector authentifiziert, Tabellen ermittelt, Schemas zurückgibt und Daten liest.

class LakeflowConnect:
    def __init__(self, options: dict[str, str]) -> None:
        """Initialize with connection parameters"""

    def list_tables(self) -> list[str]:
        """Return names of all tables supported by this connector."""

    def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
        """Return the Spark schema for a table."""

    def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
        """Return metadata: primary_keys, cursor_field, ingestion_type
        (snapshot|cdc|cdc_with_deletes|append)."""

    def read_table(self, table_name: str, start_offset: dict,
                   table_options: dict[str, str]) -> (Iterator[dict], dict):
        """Yield records as JSON dicts and return the next offset
        for incremental reads."""

    def read_table_deletes(self, table_name: str, start_offset: dict,
                           table_options: dict[str, str]) -> (Iterator[dict], dict):
        """Optional: Only required if ingestion_type is 'cdc_with_deletes'."""

Methodenbeschreibungen

Methode Description
__init__ Empfängt die Verbindungsparameter als Wörterbuch und initialisiert den API-Client für Ihre Quelle.
list_tables Gibt die Namen aller Tabellen (oder API-Endpunkte) zurück, die Ihr Connector verfügbar macht. Azure Databricks verwendet diese Liste, um die Tabellenauswahl-UI aufzufüllen.
get_table_schema Gibt einen Spark StructType zurück, der das Schema für die angegebene Tabelle beschreibt. Wird vor der ersten Pipelineausführung und bei jeder Ausführung aufgerufen, wenn die Schemaentwicklung aktiviert ist.
read_table_metadata Gibt ein Wörterbuch mit primary_keys, cursor_fieldund ingestion_type. Das ingestion_type muss eine von snapshot, cdc, cdc_with_deletes oder append sein.
read_table Liefert Datensätze als Python Wörterbücher und gibt den nächsten Offset für inkrementelle Lesevorgänge zurück. Bei der ersten Ausführung start_offset ist leer. Bei nachfolgenden Ausführungen enthält dies den Offset, der von der vorherigen Ausführung zurückgegeben wird.
read_table_deletes Wahlfrei. Implementieren Sie diese Methode nur, wenn ingestion_typecdc_with_deletes ist. Gibt gelöschte Datensatzschlüssel zurück und gibt den nächsten Offset zurück.

Entwickeln Sie Ihren Connector

Führen Sie die folgenden Schritte aus, um einen neuen Connector zu erstellen und zu überprüfen:

  1. Recherchieren Sie die Quell-API: Untersuchen Sie die API-Spezifikationen, Authentifizierungsmechanismen, Ratelimits und verfügbare Datenschemas. Identifizieren Sie, welche Tabellen oder Endpunkte freigegeben werden sollen.

  2. Authentifizierung einrichten: Generieren Sie die Verbindungsspezifikation, konfigurieren Sie Anmeldeinformationen für die Quelle, und überprüfen Sie die Konnektivität aus Ihrer Entwicklungsumgebung.

  3. Implementieren Sie den Connector: Codieren Sie alle erforderlichen LakeflowConnect Schnittstellenmethoden, um eine Verbindung mit der Quell-API herzustellen und Daten im erwarteten Format zurückzugeben.

  4. Testen und iterieren: Führen Sie die standardmäßigen Testsuites mit einem echten Quellsystem aus und beheben Sie alle Probleme. Details finden Sie unter "Testen des Connectors ".

  5. Dokumentieren Sie den Verbinder: Schreiben Sie eine benutzerorientierte README.md Datei, und generieren Sie die YaML-Datei der Konnektorspezifikation, die die konfigurierbaren Parameter des Connectors beschreibt.

  6. Erstellen Sie das Bereitstellungsartefakt: Führen Sie das Buildskript aus, um das Einzeldateiartefakt zu erzeugen, das in einem Arbeitsbereich bereitgestellt werden kann.

Testen des Verbinders

Das Repository bietet mehrere Testansätze:

Generische Testsuite (erforderlich)

Stellt eine Verbindung zu einer echten Quelle her, indem Ihre bereitgestellten Anmeldeinformationen verwendet werden, um die End-to-End-Funktionalität einschließlich Authentifizierung, Schemaermittlung und Datenauslesungen zu verifizieren.

python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json

Führt Schreib-Lese-Überprüfungszyklen aus, um inkrementelle Lese- und Löschvorgänge zu validieren. Dadurch wird bestätigt, dass die Offsetnachverfolgung und die CDC-Logik ordnungsgemäß funktionieren.

python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json

Einheitentests

Schreiben Sie Komponententests für jede komplexe benutzerdefinierte Logik in Ihrem Connector, z. B. Paginierungsbehandlung, Typkoersion oder Fehlerwiederherstellung.

Bereitstellungsartefakt erstellen

Nachdem Ihr Connector die Test-Suiten bestanden hat, führen Sie das Zusammenführungsskript aus, um ein Bereitstellungsartefakt in einer Datei zu generieren. Die Pipeline verwendet diese Datei zur Laufzeit anstelle des vollständigen Repositorys.

python tools/scripts/merge_python_source.py --connector <your-source>

Dies erzeugt eine eigenständige Python Datei in dist/<your-source>/, die alle Connectorcode und Abhängigkeiten enthält.

Erstellen einer Aufnahmepipeline

Um Ihren Connector zu testen:

  1. In der Randleiste Des Azure Databricks Arbeitsbereichs, klicken Sie auf +Neu>Daten hinzufügen und wählen Sie dann + Community Connector hinzufügen unter Community Connectors aus.

  2. Geben Sie für den Quellnamen den Namen des Connectors ein.

  3. Geben Sie für GitHub Repository-URL die URL des GitHub Repositorys ein, das den Quellcode des Connectors hosten soll.

  4. Klicken Sie auf "Verbinder hinzufügen".

  5. Klicken Sie auf +Verbindung erstellen , oder wählen Sie eine vorhandene Verbindung aus, und klicken Sie dann auf Weiter.

  6. Geben Sie für den Pipelinenamen einen Namen für die Pipeline ein.

  7. Geben Sie für den Speicherort des Ereignisprotokolls einen Katalognamen und einen Schemanamen ein. Azure Databricks speichert hier das Pipelineereignisprotokoll. Aufgenommene Tabellen werden auch standardmäßig hier geschrieben.

  8. Geben Sie für den Stammpfad Ihren Arbeitsbereichspfad ein (z. B. /Workspace/Users/<your-email>/connectors). Azure Databricks klont und speichert den Connector-Quellcode hier.

  9. Klicken Sie auf "Pipeline erstellen".

  10. Im Pipeline-Editor öffnen Sie ingest.py und ändern das Objekte-Feld, um die Tabellen einzufügen, die Sie importieren möchten. Beispiel:

    from databricks.labs.community_connector.pipeline import ingest
    
    pipeline_spec = {
        "connection_name": "my_connector_connection",  # Required: UC connection name
        "objects": [
            {"table": {"source_table": "my_table"}},
        ],
    }
    
    ingest(spark, pipeline_spec)
    
  11. Führen Sie die Pipeline manuell aus, oder planen Sie sie.

Pipelinekonfigurationsoptionen

Sie können die folgenden Optionen konfigurieren in ingest.py:

Auswahl Description
connection_name Erforderlich. Der Name der Verbindung, die Authentifizierungsanmeldeinformationen für die Quelle speichert.
objects Erforderlich. Eine Liste der zu aufnehmenden Tabellen. Jeder Eintrag weist das Format {"table": {"source_table": "..."}}auf. Sie können auch ein optionales destination_table innerhalb des table Objekts angeben.
destination_catalog Der Katalog, in dem importierte Tabellen abgelegt werden. Standardeinstellung für den Katalog, der während der Pipelineerstellung festgelegt wurde.
destination_schema Das Schema, in das eingelesene Tabellen geschrieben werden. Standardeinstellung für das Schema, das während der Pipelineerstellung festgelegt wurde.
scd_type Die langsam ändernde Dimensionstrategie: SCD_TYPE_1, SCD_TYPE_2 oder APPEND_ONLY. Wird standardmäßig auf SCD_TYPE_1 festgelegt.
primary_keys Überschreiben Sie die voreingestellten Primärschlüssel einer Tabelle. Geben Sie eine Liste von Spaltennamen an.

Registrieren Sie Ihren Connector

Nachdem Sie Ihren Connector erstellt und getestet haben, öffnen Sie eine Pull-Anforderung im Lakeflow Community Connectors-Repository , um ihn der Community zur Verfügung zu stellen.