Erstellen einer MySQL-Aufnahmepipeline

Von Bedeutung

Der MySQL-Connector befindet sich in der öffentlichen Vorschau. Wenden Sie sich an Ihr Azure Databricks-Kontoteam, um den Zugriff anzufordern.

Erfahren Sie, wie Sie Daten aus MySQL mithilfe von Lakeflow Connect in Azure Databricks aufnehmen. Der MySQL-Connector unterstützt Amazon RDS für MySQL, Aurora MySQL, Azure-Datenbank für MySQL, Google Cloud SQL für MySQL und MySQL, die auf EC2 ausgeführt wird.

Bevor Sie anfangen

  • Um ein Aufnahmegateway und eine Aufnahmepipeline zu erstellen, müssen Sie die folgenden Anforderungen erfüllen:

    • Ihr Arbeitsbereich ist für Unity Catalog aktiviert.

    • Serverlose Berechnung ist für Ihren Arbeitsbereich aktiviert. Siehe Serverlose Computeanforderungen.

    • Wenn Sie eine Verbindung erstellen möchten: Sie verfügen über CREATE CONNECTION Berechtigungen für den Metastore. Weitere Informationen finden Sie unter Verwalten von Berechtigungen in Unity Catalog.

      Wenn Ihr Connector die benutzeroberflächenbasierte Pipelineerstellung unterstützt, können Sie die Verbindung und die Pipeline gleichzeitig erstellen, indem Sie die Schritte auf dieser Seite ausführen. Wenn Sie jedoch apibasierte Pipelineerstellung verwenden, müssen Sie die Verbindung im Katalog-Explorer erstellen, bevor Sie die Schritte auf dieser Seite ausführen. Siehe Herstellen einer Verbindung mit verwalteten Aufnahmequellen.

    • Wenn Sie beabsichtigen, eine vorhandene Verbindung zu verwenden: Sie verfügen über USE CONNECTION Berechtigungen oder ALL PRIVILEGES für die Verbindung.

    • Sie verfügen über USE CATALOG-Berechtigungen für den Zielkatalog.

    • Sie verfügen über USE SCHEMA, CREATE TABLE und CREATE VOLUME Berechtigungen für ein vorhandenes Schema oder über CREATE SCHEMA Berechtigungen für den Zielkatalog.

    • Uneingeschränkte Berechtigungen zum Erstellen von Clustern oder einer benutzerdefinierten Richtlinie (nur API). Eine benutzerdefinierte Richtlinie für das Gateway muss die folgenden Anforderungen erfüllen:

      • Familie: Job-Compute

      • Außerkraftsetzungen der Richtlinienfamilie:

        {
          "cluster_type": {
            "type": "fixed",
            "value": "dlt"
          },
          "num_workers": {
            "type": "unlimited",
            "defaultValue": 1,
            "isOptional": true
          },
          "runtime_engine": {
            "type": "fixed",
            "value": "STANDARD",
            "hidden": true
          }
        }
        
      • Mit der folgenden Berechnungsrichtlinie können Azure Databricks das Aufnahmegateway skalieren, um die Anforderungen Ihrer Workload zu erfüllen. Die Mindestanforderung beträgt 4 Kerne. Für eine bessere Leistung bei der Momentaufnahmenextraktion empfiehlt Databricks jedoch die Verwendung größerer Instanzentypen mit mehr Arbeitsspeicher und CPU-Kernen.

        {
          "driver_node_type_id": {
            "type": "fixed",
            "value": "Standard_E8d_v4"
          },
          "node_type_id": {
            "type": "fixed",
            "value": "Standard_F4s"
          }
        }
        

      Weitere Informationen zu Clusterrichtlinien finden Sie unter Auswählen einer Richtlinie für Rechenressourcen.

  • Um von MySQL zu importieren, müssen Sie auch die Quellenkonfiguration abschließen.

Option 1: Azure Databricks-Benutzeroberfläche

Administratoren können gleichzeitig in der Benutzeroberfläche eine Verbindung und eine Pipeline erstellen. Dies ist die einfachste Möglichkeit zum Erstellen von verwalteten Aufnahmepipelines.

  1. Klicken Sie in der Randleiste des Azure Databricks-Arbeitsbereichs auf "Datenaufnahme".

  2. Klicken Sie auf der Seite "Daten hinzufügen" unter "Databricks-Connectors" auf "MySQL". Der Erfassungs-Assistent wird geöffnet.

  3. Geben Sie auf der Seite Ingestion-Gateway des Assistenten einen eindeutigen Namen für das Gateway ein.

  4. Wählen Sie einen Katalog und ein Schema für die Staging-Aufnahmedaten aus, und klicken Sie dann auf "Weiter".

  5. Geben Sie auf der Seite "Ingestion-Pipeline " einen eindeutigen Namen für die Pipeline ein.

  6. Wählen Sie für den Zielkatalog einen Katalog aus, um die aufgenommenen Daten zu speichern.

  7. Wählen Sie die Unity-Katalogverbindung aus, die die für den Zugriff auf die Quelldaten erforderlichen Anmeldeinformationen speichert.

    Wenn keine Verbindungen mit der Quelle vorhanden sind, klicken Sie auf "Verbindung erstellen" , und geben Sie die Authentifizierungsdetails ein, die Sie aus der Quelleinrichtung erhalten haben. Sie müssen über CREATE CONNECTION-Berechtigungen für den Metaspeicher verfügen.

    Hinweis

    Die Schaltfläche " Verbindung testen " kann für MySQL-Benutzer mit sha256_password oder caching_sha2_password Authentifizierung fehlschlagen. Dies ist eine bekannte Einschränkung. Sie können weiterhin mit dem Erstellen der Verbindung fortfahren.

  8. Klicken Sie auf "Pipeline erstellen", und fahren Sie fort.

  9. Wählen Sie auf der Seite "Quelle " die Datenbanken und Tabellen aus, die aufgenommen werden sollen.

  10. Ändern Sie optional die Standardeinstellung für die Verlaufsverfolgung. Weitere Informationen finden Sie unter Aktivieren der Verlaufsverfolgung (SCD-Typ 2).

  11. Klicke auf Weiter.

  12. Wählen Sie auf der Seite "Ziel " den Unity-Katalog und das Schema aus, in das geschrieben werden soll.

    Wenn Sie kein vorhandenes Schema verwenden möchten, klicken Sie auf "Schema erstellen". Sie müssen über die Berechtigungen USE CATALOG und CREATE SCHEMA für den übergeordneten Katalog verfügen.

  13. Klicken Sie auf Speichern und fortfahren.

  14. (Optional) Klicken Sie auf der Seite "Einstellungen" auf " Zeitplan erstellen". Legen Sie die Häufigkeit fest, mit der die Zieltabellen aktualisiert werden.

  15. (Optional) Festlegen von E-Mail-Benachrichtigungen für Erfolg oder Fehler des Pipelinevorgangs.

  16. Klicken Sie auf "Speichern", und führen Sie die Pipelineaus.

Option 2: Programmgesteuerte Schnittstellen

Bevor Sie deklarative Automatisierungspakete, Databricks-APIs, Databricks-SDKs, die Databricks CLI oder Terraform aufnehmen, müssen Sie Zugriff auf eine vorhandene Unity-Katalogverbindung haben. Anweisungen finden Sie unter Herstellen einer Verbindung mit verwalteten Aufnahmequellen.

Erstellen des Staging-Katalogs und des Schemas

Der Stagingkatalog und das Stagingschema können mit dem Zielkatalog und -schema identisch sein. Bei dem Staging-Katalog kann es sich nicht um einen externen Katalog handeln.

export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "MYSQL",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "'"$DB_PORT"'",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Erstellen Sie das Gateway und die Aufnahmepipeline

Das Aufnahmegateway extrahiert Momentaufnahmen und ändert Daten aus der Quelldatenbank und speichert sie in einem Unity-Katalog-Stagingvolume. Sie müssen das Gateway als fortlaufende Pipeline ausführen. Dadurch werden Binlog-Aufbewahrungsrichtlinien für die Quelldatenbank berücksichtigt.

Die Erfassungspipeline wendet die Momentaufnahme- und Änderungsdaten aus dem Stagingvolume in Zielstreamingtabellen an.

Deklarative Automatisierungspakete

Auf dieser Registerkarte wird beschrieben, wie Sie eine Aufnahmepipeline mithilfe von deklarativen Automatisierungspaketen bereitstellen. Bundles können YAML-Definitionen von Aufträgen und Aufgaben enthalten, mithilfe der Databricks CLI verwaltet und in verschiedenen Zielarbeitsbereichen (z. B. Entwicklung, Staging und Produktion) freigegeben und ausgeführt werden. Weitere Informationen finden Sie unter Deklarative Automatisierungsbundle.

  1. Erstellen Eines Bündels mithilfe der Databricks CLI:

    databricks bundle init
    
  2. Fügen Sie dem Bundle zwei neue Ressourcendateien hinzu:

    • Eine Pipelinedefinitionsdatei (z. B resources/mysql_pipeline.yml. ).
    • Eine Auftragsdefinitionsdatei, die die Häufigkeit der Datenaufnahme steuert (z. B. resources/mysql_job.yml).

    Im Folgenden finden Sie eine resources/mysql_pipeline.yml-Beispieldatei:

    variables:
      # Common variables used multiple places in the DAB definition.
      gateway_name:
        default: mysql-gateway
      dest_catalog:
        default: main
      dest_schema:
        default: ingest-destination-schema
    
    resources:
      pipelines:
        gateway:
          name: ${var.gateway_name}
          gateway_definition:
            connection_name: <mysql-connection>
            gateway_storage_catalog: main
            gateway_storage_schema: ${var.dest_schema}
            gateway_storage_name: ${var.gateway_name}
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
    
        pipeline_mysql:
          name: mysql-ingestion-pipeline
          ingestion_definition:
            ingestion_gateway_id: ${resources.pipelines.gateway.id}
            objects:
              # Modify this with your tables!
              - table:
                  # Ingest the table mydb.customers to dest_catalog.dest_schema.customers
                  source_schema: public
                  source_table: customers
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - schema:
                  # Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
                  # The destination table name will be the same as it is on the source
                  source_schema: sales
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
    

    Im Folgenden finden Sie eine resources/mysql_job.yml-Beispieldatei:

    resources:
      jobs:
        mysql_dab_job:
          name: mysql_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_mysql.id}
    
  3. Stellen Sie die Pipeline mithilfe der Databricks CLI bereit:

    databricks bundle deploy
    
Azure Databricks-Notizbuch

Aktualisieren Sie die Configuration-Zelle im folgenden Notebook mit der Quellverbindung, dem Zielkatalog, dem Zielschema und den Tabellen, um Daten aus der Quelle zu erfassen.

Erstellen einer Gateway- und Erfassungspipeline

Notebook abrufen

Databricks CLI

So erstellen Sie das Gateway:

export GATEWAY_PIPELINE_NAME="mysql-gateway"

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"catalog": "'"$STAGING_CATALOG"'",
"schema": "'"$STAGING_SCHEMA"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

So erstellen Sie die Erfassungspipeline:

export INGESTION_PIPELINE_NAME="mysql-ingestion-pipeline"

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"catalog": "'"$TARGET_CATALOG"'",
"schema": "'"$TARGET_SCHEMA"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_schema": "public",
        "source_table": "customers",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "customers"
        }},
     {"schema": {
        "source_schema": "sales",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        }}
    ]
  }
}'
Terraform

Sie können Terraform verwenden, um MySQL-Aufnahmepipelines bereitzustellen und zu verwalten. Ein vollständiges Beispielframework, einschließlich Terraform-Konfigurationen zum Erstellen von Gateways und Aufnahmepipelinen, finden Sie im Repository für Lakeflow Connect Terraform-Beispiele auf GitHub.

Weitere Ressourcen