Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Von Bedeutung
Dieses Feature befindet sich in der Betaversion. Arbeitsbereichsadministratoren können den Zugriff auf dieses Feature über die Vorschauseite steuern. Siehe Manage Azure Databricks Previews.
Diese Seite zeigt, wie Sie einen Connector für eine Quelle erstellen, die in Lakeflow Connect noch nicht unterstützt wird. Erstellen und testen Sie zuerst Ihren Connector lokal mithilfe der Tools und Vorlagen im Lakeflow Community Connectors-Repository auf GitHub. Das Repository enthält KI-basierte Entwicklungstools, die bei jeder Phase unterstützt werden, einschließlich Quellforschung, Authentifizierungseinrichtung, Implementierung und Tests.
Wenn Ihr benutzerdefinierter Connector einsatzbereit ist, probieren Sie ihn in Ihrem Azure Databricks Arbeitsbereich aus, und registrieren Sie ihn dann bei der Community, indem Sie eine Pullanforderung öffnen.
Informationen zum Verwenden eines registrierten Community-Connectors finden Sie unter Verwenden eines registrierten Community-Connectors.
Anforderungen
Bevor Sie beginnen, stellen Sie sicher, dass Sie folgendes haben:
- Python 3.10 oder höher
- Ein Azure Databricks Arbeitsbereich mit aktiviertem Unity-Katalog
- API-Anmeldeinformationen für die Quelle, mit der Sie eine Verbindung herstellen möchten
- Git lokal installiert
Einrichten des Repositorys
Klonen Sie das Lakeflow Community Connectors-Repository , und installieren Sie die Entwicklungsabhängigkeiten.
Klonen Sie das Repository:
git clone https://github.com/databrickslabs/lakeflow-community-connectors.git cd lakeflow-community-connectorsErstellen Sie eine virtuelle Umgebung, und installieren Sie Abhängigkeiten:
python -m venv .venv source .venv/bin/activate pip install -e ".[dev]"Kopieren Sie ein vorhandenes Connector-Verzeichnis (z. B.
connectors/stripe/) als Ausgangspunkt für Ihren neuen Connector.cp -r connectors/stripe connectors/<your-source>
Implementieren der LakeflowConnect Schnittstelle
Jeder Communityconnector implementiert die LakeflowConnect Schnittstelle, die definiert, wie Ihr Connector authentifiziert, Tabellen ermittelt, Schemas zurückgibt und Daten liest.
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""
def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""
def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""
def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""
def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""
def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""
Methodenbeschreibungen
| Methode | Description |
|---|---|
__init__ |
Empfängt die Verbindungsparameter als Wörterbuch und initialisiert den API-Client für Ihre Quelle. |
list_tables |
Gibt die Namen aller Tabellen (oder API-Endpunkte) zurück, die Ihr Connector verfügbar macht. Azure Databricks verwendet diese Liste, um die Tabellenauswahl-UI aufzufüllen. |
get_table_schema |
Gibt einen Spark StructType zurück, der das Schema für die angegebene Tabelle beschreibt. Wird vor der ersten Pipelineausführung und bei jeder Ausführung aufgerufen, wenn die Schemaentwicklung aktiviert ist. |
read_table_metadata |
Gibt ein Wörterbuch mit primary_keys, cursor_fieldund ingestion_type. Das ingestion_type muss eine von snapshot, cdc, cdc_with_deletes oder append sein. |
read_table |
Liefert Datensätze als Python Wörterbücher und gibt den nächsten Offset für inkrementelle Lesevorgänge zurück. Bei der ersten Ausführung start_offset ist leer. Bei nachfolgenden Ausführungen enthält dies den Offset, der von der vorherigen Ausführung zurückgegeben wird. |
read_table_deletes |
Wahlfrei. Implementieren Sie diese Methode nur, wenn ingestion_typecdc_with_deletes ist. Gibt gelöschte Datensatzschlüssel zurück und gibt den nächsten Offset zurück. |
Entwickeln Sie Ihren Connector
Führen Sie die folgenden Schritte aus, um einen neuen Connector zu erstellen und zu überprüfen:
Recherchieren Sie die Quell-API: Untersuchen Sie die API-Spezifikationen, Authentifizierungsmechanismen, Ratelimits und verfügbare Datenschemas. Identifizieren Sie, welche Tabellen oder Endpunkte freigegeben werden sollen.
Authentifizierung einrichten: Generieren Sie die Verbindungsspezifikation, konfigurieren Sie Anmeldeinformationen für die Quelle, und überprüfen Sie die Konnektivität aus Ihrer Entwicklungsumgebung.
Implementieren Sie den Connector: Codieren Sie alle erforderlichen
LakeflowConnectSchnittstellenmethoden, um eine Verbindung mit der Quell-API herzustellen und Daten im erwarteten Format zurückzugeben.Testen und iterieren: Führen Sie die standardmäßigen Testsuites mit einem echten Quellsystem aus und beheben Sie alle Probleme. Details finden Sie unter "Testen des Connectors ".
Dokumentieren Sie den Verbinder: Schreiben Sie eine benutzerorientierte
README.mdDatei, und generieren Sie die YaML-Datei der Konnektorspezifikation, die die konfigurierbaren Parameter des Connectors beschreibt.Erstellen Sie das Bereitstellungsartefakt: Führen Sie das Buildskript aus, um das Einzeldateiartefakt zu erzeugen, das in einem Arbeitsbereich bereitgestellt werden kann.
Testen des Verbinders
Das Repository bietet mehrere Testansätze:
Generische Testsuite (erforderlich)
Stellt eine Verbindung zu einer echten Quelle her, indem Ihre bereitgestellten Anmeldeinformationen verwendet werden, um die End-to-End-Funktionalität einschließlich Authentifizierung, Schemaermittlung und Datenauslesungen zu verifizieren.
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json
Zurückschreiben von Tests (empfohlen)
Führt Schreib-Lese-Überprüfungszyklen aus, um inkrementelle Lese- und Löschvorgänge zu validieren. Dadurch wird bestätigt, dass die Offsetnachverfolgung und die CDC-Logik ordnungsgemäß funktionieren.
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json
Einheitentests
Schreiben Sie Komponententests für jede komplexe benutzerdefinierte Logik in Ihrem Connector, z. B. Paginierungsbehandlung, Typkoersion oder Fehlerwiederherstellung.
Bereitstellungsartefakt erstellen
Nachdem Ihr Connector die Test-Suiten bestanden hat, führen Sie das Zusammenführungsskript aus, um ein Bereitstellungsartefakt in einer Datei zu generieren. Die Pipeline verwendet diese Datei zur Laufzeit anstelle des vollständigen Repositorys.
python tools/scripts/merge_python_source.py --connector <your-source>
Dies erzeugt eine eigenständige Python Datei in dist/<your-source>/, die alle Connectorcode und Abhängigkeiten enthält.
Erstellen einer Aufnahmepipeline
Um Ihren Connector zu testen:
In der Randleiste Des Azure Databricks Arbeitsbereichs, klicken Sie auf +Neu>Daten hinzufügen und wählen Sie dann + Community Connector hinzufügen unter Community Connectors aus.
Geben Sie für den Quellnamen den Namen des Connectors ein.
Geben Sie für GitHub Repository-URL die URL des GitHub Repositorys ein, das den Quellcode des Connectors hosten soll.
Klicken Sie auf "Verbinder hinzufügen".
Klicken Sie auf +Verbindung erstellen , oder wählen Sie eine vorhandene Verbindung aus, und klicken Sie dann auf Weiter.
Geben Sie für den Pipelinenamen einen Namen für die Pipeline ein.
Geben Sie für den Speicherort des Ereignisprotokolls einen Katalognamen und einen Schemanamen ein. Azure Databricks speichert hier das Pipelineereignisprotokoll. Aufgenommene Tabellen werden auch standardmäßig hier geschrieben.
Geben Sie für den Stammpfad Ihren Arbeitsbereichspfad ein (z. B.
/Workspace/Users/<your-email>/connectors). Azure Databricks klont und speichert den Connector-Quellcode hier.Klicken Sie auf "Pipeline erstellen".
Im Pipeline-Editor öffnen Sie
ingest.pyund ändern das Objekte-Feld, um die Tabellen einzufügen, die Sie importieren möchten. Beispiel:from databricks.labs.community_connector.pipeline import ingest pipeline_spec = { "connection_name": "my_connector_connection", # Required: UC connection name "objects": [ {"table": {"source_table": "my_table"}}, ], } ingest(spark, pipeline_spec)Führen Sie die Pipeline manuell aus, oder planen Sie sie.
Pipelinekonfigurationsoptionen
Sie können die folgenden Optionen konfigurieren in ingest.py:
| Auswahl | Description |
|---|---|
connection_name |
Erforderlich. Der Name der Verbindung, die Authentifizierungsanmeldeinformationen für die Quelle speichert. |
objects |
Erforderlich. Eine Liste der zu aufnehmenden Tabellen. Jeder Eintrag weist das Format {"table": {"source_table": "..."}}auf. Sie können auch ein optionales destination_table innerhalb des table Objekts angeben. |
destination_catalog |
Der Katalog, in dem importierte Tabellen abgelegt werden. Standardeinstellung für den Katalog, der während der Pipelineerstellung festgelegt wurde. |
destination_schema |
Das Schema, in das eingelesene Tabellen geschrieben werden. Standardeinstellung für das Schema, das während der Pipelineerstellung festgelegt wurde. |
scd_type |
Die langsam ändernde Dimensionstrategie: SCD_TYPE_1, SCD_TYPE_2 oder APPEND_ONLY. Wird standardmäßig auf SCD_TYPE_1 festgelegt. |
primary_keys |
Überschreiben Sie die voreingestellten Primärschlüssel einer Tabelle. Geben Sie eine Liste von Spaltennamen an. |
Registrieren Sie Ihren Connector
Nachdem Sie Ihren Connector erstellt und getestet haben, öffnen Sie eine Pull-Anforderung im Lakeflow Community Connectors-Repository , um ihn der Community zur Verfügung zu stellen.