Freigeben über


Datenflussaktivität in Azure Data Factory und Azure Synapse Analytics

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

Verwenden Sie die Datenflussaktivität, um Daten mithilfe von Mapping-Datenflüssen zu transformieren und zu verschieben. Wenn Sie mit Datenflüssen noch nicht vertraut sind, finden Sie weitere Informationen in der Übersicht über Mapping Data Flow.

Erstellen einer Datenflussaktivität über die Benutzeroberfläche

Führen Sie die folgenden Schritte aus, um eine Datenflussaktivität in einer Pipeline zu verwenden:

  1. Suchen Sie im Aktivitätenbereich der Pipeline nach Datenfluss, und ziehen Sie eine Datenflussaktivität auf die Pipeline-Canvas.

  2. Wählen Sie die neue Datenflussaktivität auf der Leinwand aus, falls sie nicht bereits ausgewählt ist, und wählen Sie anschließend die Registerkarte Einstellungen, um die Details zu bearbeiten.

    Zeigt die Benutzeroberfläche für eine Datenfluss-Aktivität.

  3. Mit dem Prüfpunktschlüssel wird der Prüfpunkt festgelegt, wenn der Datenfluss für die geänderte Datenerfassung verwendet wird. Sie können ihn überschreiben. Datenflussaktivitäten verwenden einen GUID-Wert als Prüfpunktschlüssel anstelle von „Pipelinename + Aktivitätsname“, damit der CDC-Status (Change Data Capture) des Kunden auch dann nachverfolgt werden kann, wenn Umbenennungsaktionen durchgeführt werden. Alle vorhandenen Datenflussaktivitäten verwenden aus Gründen der Abwärtskompatibilität den Schlüssel mit dem alten Muster. Die Option für Prüfpunktschlüssel nach der Veröffentlichung einer neuen Datenflussaktivität mit aktivierter Change Data Capture-Datenflussressource ist nachfolgend dargestellt:

    Zeigt die Benutzeroberfläche für eine Datenflussaktivität mit einem Prüfpunktschlüssel.

  4. Wählen Sie einen vorhandenen Datenfluss aus, oder erstellen Sie mithilfe der Schaltfläche „Neu“ einen neuen. Wählen Sie nach Bedarf weitere Optionen aus, um Ihre Konfiguration abzuschließen.

Syntax

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Typeigenschaften

Eigenschaft BESCHREIBUNG Zulässige Werte Erforderlich
Datenfluss Der Verweis auf den Datenfluss, der ausgeführt wird. DataFlowReference Ja
Integrationslaufzeit Die Computeumgebung, in der der Datenfluss ausgeführt wird. Wenn hier nichts angegeben wird, wird die Azure Integration Runtime mit automatischer Selbstauflösung verwendet. IntegrationRuntimeReference Nein
compute.coreCount Die Anzahl von Kernen, die im Spark-Cluster verwendet werden. Kann nur angegeben werden, wenn die Azure Integration Runtime für automatische Problemlösung verwendet wird. 8, 16, 32, 48, 80, 144, 272 Nein
Berechnen.TypBerechnung Die Art der Berechnung, die im Spark-Cluster verwendet wird. Kann nur angegeben werden, wenn die Azure Integration Runtime mit der Funktion zur automatischen Auflösung verwendet wird. „Allgemein“ Nein
Staging.VerknüpfterDienst Geben Sie das für PolyBase-Staging verwendete Speicherkonto an, wenn Sie eine Azure Synapse Analytics-Quelle oder -Senke verwenden.

Wenn Ihre Azure Storage-Instanz mit einem VNET-Dienstendpunkt konfiguriert ist, müssen Sie die Authentifizierung per verwalteter Identität mit für das Speicherkonto aktivierter Option „Vertrauenswürdigen Microsoft-Diensten den Zugriff auf dieses Speicherkonto erlauben“ verwenden. Informationen hierzu finden Sie unter Auswirkungen der Verwendung von VNET-Dienstendpunkten mit Azure Storage. Außerdem erhalten Sie Informationen zu den erforderlichen Konfigurationen für Azure Blob bzw. Azure Data Lake Storage Gen2.
LinkedServiceReference Nur wenn der Datenfluss Daten in Azure Synapse Analytics liest oder schreibt
staging.folderPath Wenn Sie eine Azure Synapse Analytics-Quelle oder -Senke verwenden, ist der Ordnerpfad im Blobspeicherkonto für das PolyBase-Staging erforderlich. String Nur wenn der Datenfluss Daten in Azure Synapse Analytics liest oder schreibt
traceLevel Festlegen des Protokollierungsniveaus bei der Ausführung Ihrer Datenflussaktivität Fein, Grob, Keine Nein

Ausführen eines Datenflusses

Dynamisches Bestimmen der Größe der Computekapazität für den Datenfluss zur Laufzeit

Die Eigenschaften „Anzahl Kerne“ und „Computetyp“ können dynamisch festgelegt werden, um die Größe der eingehenden Quelldaten zur Laufzeit anzupassen. Verwenden Sie Pipelineaktivitäten wie „Nachschlagen“ oder „Metadaten abrufen“, um die Größe der Daten im Quelldatenset zu bestimmen. Verwenden Sie dann in den Eigenschaften der Datenflussaktivität „Dynamischen Inhalt hinzufügen“. Sie können kleine, mittlere oder große Computegrößen auswählen. Wählen Sie optional "Benutzerdefiniert" aus, und konfigurieren Sie die Berechnungstypen und die Anzahl der Kerne manuell.

Dynamischer Datenfluss

Hier finden Sie ein kurzes Videotutorial, das diese Technik erläutert.

Datenfluss-Integrationslaufzeit

Wählen Sie die Integration Runtime aus, die für die Ausführung Ihrer Datenflussaktivität verwendet werden soll. Standardmäßig verwendet der Dienst die autoresolve Azure Integration Runtime mit vier Worker-Kernen. Diese Integration Runtime hat einen allgemeinen Rechentyp und wird in derselben Region wie Ihre Dienstinstanz ausgeführt. Für operationalisierte Pipelines wird dringend empfohlen, dass Sie Ihre eigenen Azure Integration Runtimes erstellen, um spezifische Regionen, den Computetyp, die Kernanzahl und die Gültigkeitsdauer (TTL) für die Ausführung Ihrer Datenflussaktivität festzulegen.

Für die meisten Produktionsworkloads wird mindestens ein Computetyp „Allgemeiner Zweck“ mit einer Konfiguration von 8+8 (insgesamt 16 virtuellen Kernen) und einer Lebensdauer (Time to live, TTL) von zehn Minuten empfohlen. Durch Festlegen eines niedrigen TTL-Werts kann die Azure IR einen „warmen“ Cluster beibehalten, der nicht mehrere Minuten zum Starten benötigt, so wie ein „kalter“ Cluster. Weitere Informationen finden Sie unter Azure Integration Runtime.

Azure Integration Runtime

Wichtig

Die Auswahl von Integration Runtime in der Datenflussaktivität bezieht sich nur auf getriggerte Ausführungen Ihrer Pipeline. Das Debuggen Ihrer Pipeline mit Datenflüssen wird auf dem in der Debugsitzung angegebenen Cluster ausgeführt.

PolyBase

Wenn Sie Azure Synapse Analytics als Senke oder Quelle verwenden, müssen Sie einen Staging-Speicherort für Ihren PolyBase-Batch-Ladevorgang auswählen. PolyBase ermöglicht das Batchladen per Massenvorgang, anstatt die Daten zeilenweise zu laden. PolyBase verkürzt die Ladezeit in Azure Synapse Analytics erheblich.

Prüfpunktschlüssel

Wenn Sie die Änderungserfassungsoption für Datenflussquellen verwenden, verwaltet ADF den Prüfpunkt automatisch für Sie. Der Standardprüfpunktschlüssel ist ein Hash des Datenflussnamens und des Pipelinenamens. Wenn Sie ein dynamisches Muster für Ihre Quelltabellen oder -ordner verwenden, können Sie hier diesen Hash überschreiben und selbst einen Wert für den Prüfpunktschlüssel festlegen.

Protokollierungsgrad

Wenn es nicht erforderlich ist, dass jede Pipelineausführung Ihrer Datenflussaktivitäten alle ausführlichen Telemetrieprotokolle vollständig protokolliert, können Sie den Protokolliergrad optional auf „Standard“ oder „Kein“ festlegen. Wenn Sie Ihre Datenflüsse im Modus „Ausführlich“ (Standard) ausführen, fordern Sie den Dienst auf, die Aktivität vollständig auf den einzelnen Partitionsebenen zu protokollieren, während die Datentransformation durchgeführt wird. Da dies ein kostspieliger Vorgang sein kann, kann die Aktivierung von „Ausführlich“ nur bei der Problembehandlung Ihren gesamten Datenfluss und die Pipelineleistung verbessern. Im Modus „Standard“ wird nur die Dauer der Transformation protokolliert, während der Modus „Kein“ lediglich eine Zusammenfassung der Werte für die Dauer bereitstellt.

Protokollierungsstufe

Sink-Eigenschaften

Mit der Gruppierungsfunktion in Datenflüssen können Sie sowohl die Ausführungsreihenfolge der Senken festlegen als auch mehrere Senken mithilfe derselben Gruppennummer zusammenfassen. Um die Verwaltung von Gruppen zu erleichtern, können Sie den Dienst anweisen, Senken innerhalb derselben Gruppe parallel auszuführen. Sie können die Senkengruppe auch so festlegen, dass sie selbst dann fortgesetzt wird, wenn bei einer der Senken ein Fehler auftritt.

Beim Standardverhalten einer Datenflusssenke wird jede Senke sequenziell ausgeführt, und der Datenfluss wird abgebrochen, wenn ein Fehler in der Senke auftritt. Außerdem werden alle Senken standardmäßig der gleichen Gruppe zugeordnet, es sei denn, Sie ändern die Datenflusseigenschaften und legen unterschiedliche Prioritäten für die Senken fest.

Sink-Eigenschaften

Nur erste Zeile

Diese Option ist nur für Datenflüsse verfügbar, bei denen Cache-Datenquellen für "Ausgabe an Aktivität" aktiviert sind. Die Ausgabe des Datenflusses, der direkt in Ihre Pipeline eingespeist wird, ist auf 2 MB beschränkt. Das Festlegen von "Nur erste Zeile" hilft Ihnen, die Datenausgabe des Datenflusses zu beschränken, indem die Ergebnisse der Datenflussaktivität direkt in die Pipeline eingespeist werden.

Parametrisieren von Datenflüssen

Parametrisierte Datasets

Wenn im Datenfluss parametrisierte Datasets verwendet werden, legen Sie die Parameterwerte auf der Registerkarte Einstellungen fest.

Ausführen von Datenflussparametern

Parametrisierte Datenflüsse

Wenn Ihr Datenfluss parametrisiert ist, legen Sie die dynamischen Werte der Datenflussparameter auf der Registerkarte Parameter fest. Sie können entweder die Pipelineausdruckssprache oder die Datenflussausdruckssprache verwenden, um dynamische oder literale Parameterwerte zuzuweisen. Weitere Informationen finden Sie unter Datenflussparameter.

Parametrisierte Rechen-Eigenschaften.

Sie können die Anzahl von Kernen oder den Computetyp parametrisieren, wenn Sie die Azure Integration Runtime mit automatischer Auflösung verwenden, und Werte für „compute.corecount“ und „compute.computetype“ angeben.

Ausführen von Datenflussparametern (Beispiel)

Debugging der Datenfluss-Aktivität in der Pipeline

Wenn Sie ein Debugging der Pipeline mit einer Datenflussaktivität ausführen möchten, müssen Sie den Datenfluss-Debugmodus über den Schieberegler Datenfluss debuggen auf der oberen Leiste aktivieren. Im Debugmodus können Sie den Datenfluss für einen aktiven Spark-Cluster ausführen. Weitere Informationen finden Sie unter Debugmodus.

Screenshot: Position der Schaltfläche „Debuggen“

Die Debug-Pipeline wird auf dem aktiven Debug-Cluster und nicht in der Integrationslaufzeitumgebung ausgeführt, die in den Einstellungen für die Datenflussaktivität angegeben ist. Beim Starten des Debugmodus können Sie die Computeumgebung für das Debuggen auswählen.

Überwachen der Datenflussaktivität

Die Datenflussaktivität verfügt über eine besondere Überwachungsoberfläche, auf der Sie Informationen zu Partitionierung, Phasenzeit und Datenherkunft anzeigen können. Sie öffnen den Überwachungsbereich über das Brillensymbol unter Aktionen. Weitere Informationen finden Sie unter Überwachen von Datenflüssen.

Verwenden der Ergebnisse der Datenflussaktivität in einer nachfolgenden Aktivität

Die Datenflussaktivität gibt Metriken bezüglich der Anzahl der Zeilen aus, die in jeder Senke geschrieben werden und die in jeder Quelle gelesen werden. Diese Ergebnisse werden im Abschnitt output des Ergebnisses der Aktivitätsausführung zurückgegeben. Die zurückgegebenen Metriken weisen das Format des folgenden JSON auf.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Verwenden Sie beispielsweise @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten, um die Anzahl der in die Senke namens „sink1“ in einer Aktivität namens „dataflowActivity“ geschriebenen Zeilen zu erhalten.

Verwenden Sie @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead, um die Anzahl der von einer Quelle namens „source1“ gelesenen Zeilen abzurufen, die in dieser Senke verwendet wurden.

Hinweis

Wenn für eine Senke keine Zeilen geschrieben wurden, wird sie in Metriken nicht angezeigt. Die Existenz kann mit der contains-Funktion überprüft werden. Beispielsweise überprüft contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1'), ob Zeilen in «sink1» geschrieben wurden.

Siehe unterstützte Ablaufsteuerungsaktivitäten: