Freigeben über


Interaktive Datenaufbereitung mit Apache Spark

Das Wrangling von Daten ist ein wichtiger Aspekt von Machine Learning-Projekten. In diesem Artikel erfahren Sie, wie Sie interaktive Datenverarbeitung durchführen, indem Sie Azure Machine Learning-Notebooks auf einer serverlosen Apache Spark-Umgebung ausführen, die von Azure Synapse unterstützt wird.

In diesem Artikel wird erläutert, wie Sie eine serverlose Spark-Berechnung anfügen und konfigurieren. Der Artikel zeigt dann, wie Sie die serverlose Spark-Berechnung verwenden, um auf Daten aus mehreren Quellen zuzugreifen und diese zu wrangieren.

Voraussetzungen

Weitere Informationen finden Sie unter:

Verwenden Sie serverloses Spark-Computing in Notebook-Sitzungen

Die Verwendung einer serverlosen Spark-Berechnung ist die einfachste Möglichkeit, auf einen Spark-Cluster zuzugreifen, um interaktive Daten zu wrangieren. Ein vollständig verwalteter serverloser Spark-Compute, der an einen Synapse Spark-Pool angefügt ist, ist direkt in Azure Machine Learning-Notizbüchern verfügbar.

Um eine der folgenden Quellen und Methoden für den Datenzugriff und das Wrangling zu verwenden, ordnen Sie den Serverless-Compute von Spark zu, indem Sie Azure Machine Learning Serverless Spark>Serverless Spark Compute - Verfügbar neben Compute oben auf der Seite der Datei oder des Notebooks auswählen. Es kann eine oder zwei Minuten dauern, bis die Berechnung an die Sitzung angefügt wird.

Konfigurieren einer serverlosen Spark-Sitzung

Nachdem Sie die serverlose Spark-Berechnung angefügt haben, können Sie die Spark-Sitzung konfigurieren, indem Sie mehrere Werte festlegen oder ändern. So konfigurieren Sie die Spark-Sitzung:

  1. Wählen Sie oben links auf der Datei- oder Notizbuchseite " Sitzung konfigurieren" aus.
  2. Ändern Sie auf dem Bildschirm " Sitzung konfigurieren " eine der folgenden Einstellungen:
    • Im Bereich "Berechnen" :

      • Ändern Sie die Computergröße, indem Sie im Dropdownmenü unter "Knotengröße" eine andere Größe auswählen.
      • Wählen Sie aus, ob Sie Dynamische Zuweisung von Executors wünschen oder nicht.
      • Wählen Sie die Anzahl der Executors für die Spark-Sitzung aus.
      • Wählen Sie im Dropdownmenü eine andere Executor-Größe aus.
    • Im Einstellungsbereich :

      • Ändern Sie die Apache Spark-Version in eine andere Version als 3.5, falls verfügbar.

        Von Bedeutung

        Azure Synapse Runtime für Apache Spark 3.4 erreicht das Ende des Supports am 31. März 2026. Migrieren Sie zu Apache Spark 3.5, um weiterhin Unterstützung zu erhalten. Weitere Informationen finden Sie unter Azure Synapse-Laufzeiten.

      • Ändern Sie den Sitzungstimeoutwert in Minuten in eine höhere Zahl, um Sitzungstimeouts zu verhindern.

      • Fügen Sie unter "Konfigurationseinstellungen" Eigenschaftennamen-/Werteinstellungen hinzu, um die Sitzung nach Bedarf zu konfigurieren.

        Tipp

        Wenn Sie Conda-Pakete auf Sitzungsebene verwenden, kann das Hinzufügen der spark.hadoop.aml.enable_cache Konfigurationseigenschaft mit Wert truedie Kaltstartzeit der Spark-Sitzung verbessern. Ein Kaltstart einer Sitzung mit Conda-Paketen auf Sitzungsebene dauert in der Regel 10 bis 15 Minuten beim ersten Mal. Nachfolgende Kaltstarts von Sitzungen, bei denen die Konfigurationsvariable auf TRUE festgelegt ist, dauern in der Regel drei bis fünf Minuten.

    • Im Bereich "Python-Pakete ":

      • Wenn Sie eine Conda-Datei verwenden möchten, um Ihre Sitzung zu konfigurieren, wählen Sie "Conda-Datei hochladen" aus. Wählen Sie neben "Conda-Datei auswählen" die Option "Durchsuchen" aus, und öffnen Sie dann die entsprechende Conda YAML-Datei auf Ihrem Computer, um sie hochzuladen.
      • Wenn Sie eine benutzerdefinierte Umgebung verwenden möchten, wählen Sie "Benutzerdefinierte Umgebung " und dann eine benutzerdefinierte Umgebung unter "Umgebungstyp" aus. Weitere Informationen finden Sie unter Verwalten von Softwareumgebungen.
  3. Wählen Sie Anwenden aus, um alle Konfigurationen anzuwenden.

Die Sitzungskonfigurationsänderungen bleiben erhalten und stehen anderen Notebook-Sitzungen zur Verfügung, die die verbundenen serverlosen Spark-Compute-Ressourcen verwenden.

Importieren und Bearbeiten von Daten aus Azure Data Lake Storage

Um auf in Azure Data Lake Storage-Konten gespeicherte Daten zuzugreifen und diese zu bearbeiten, verwenden Sie einen abfss://-Protokoll-URI entweder mit Benutzeridentitäts-Passthrough oder dienstprinzipal-basiertem Zugriff. Bei Benutzeridentitätspassthrough ist keine zusätzliche Konfiguration erforderlich.

Um eine der beiden Methoden zu verwenden, muss die Benutzeridentität oder der Dienstprinzipal über Mitwirkende und Mitwirkende Blob DataRollenzuweisungen im Azure Data Lake Storage-Konto verfügen.

Für den Passthrough der Benutzeridentität führen Sie das folgende Code-Beispiel aus, um eine Daten-URI im Format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> mit pyspark.pandas zu verwenden. Ersetzen Sie den <STORAGE_ACCOUNT_NAME> Platzhalter durch den Namen Ihres Azure Data Lake Storage-Kontos und <FILE_SYSTEM_NAME> durch den Namen des Datencontainers.

import pyspark.pandas as pd

df = pd.read_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
    index_col="PassengerId",
)

Verwenden eines Dienstprinzipals

Um einen Dienstprinzipal für den Zugriff auf und das Aufbereiten von Daten aus Azure Data Lake Storage zu verwenden, richten Sie zuerst den Dienstprinzipal wie folgt ein:

  1. Erstellen Sie einen Dienstprinzipal , und weisen Sie ihm die erforderlichen Rollen "Storage Blob Data Contributor" und "Key Vault Secrets User" zu.

  2. Rufen Sie die Werte für die Prinzipal-Mandanten-ID des Dienstes, die Client-ID und das Secret des Clients aus der App-Registrierung ab und erstellen Sie Azure Key Vault-Secrets für die Werte.

  3. Legen Sie die Dienstprinzipal-Mandanten-ID, die Client-ID und das Client Secret fest, indem Sie in der Sitzungskonfiguration die folgenden Paare aus Eigenschaftsnamen und Werten hinzufügen. Ersetzen Sie <STORAGE_ACCOUNT_NAME> mit dem Namen Ihres Speicherkontos und <TENANT_ID> durch die Dienstprinzipalmandanten-ID.

    Eigenschaftsname Wert
    fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Anwendungs-ID-Wert (Client)
    fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
    fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net Client Secret Wert
  4. Führen Sie den folgenden Code aus. Der get_secret()-Aufruf im Code hängt vom Key Vault-Namen und den Namen der Key Vault-Secrets ab, die für die Dienstprinzipal-Mandanten-ID, die Client-ID und das Client-Secret erstellt wurden.

    from pyspark.sql import SparkSession
    
    sc = SparkSession.builder.getOrCreate()
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID, and secret from Azure Key Vault
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    # Set up a service principal that has access to the data
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_id,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_secret,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
    )
    
  5. Importieren und verarbeiten Sie die titanic.csv Daten unter Verwendung eines Daten-URIs im abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> Format, wie im Code-Beispiel gezeigt. Ersetzen Sie den <STORAGE_ACCOUNT_NAME> Platzhalter durch den Namen Ihres Azure Data Lake Storage-Kontos und <FILE_SYSTEM_NAME> durch den Namen des Datencontainers.

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Importieren und Aufbereiten von Daten aus Azure Blob Storage

Sie können auf Azure Blob Storage-Daten mit dem Zugriffsschlüssel für das Speicherkonto oder mit einem SAS-Token (Shared Access Signature) zugreifen. Speichern Sie die Anmeldeinformationen in Azure Key Vault als geheimen Schlüssel, und legen Sie sie in der Spark-Sitzungskonfiguration als Eigenschaft fest.

  1. Führen Sie einen der folgenden Codeausschnitte aus. Die get_secret()-Aufrufe in den Code-Schnipseln erfordern den Namen des Key Vault und die Namen der für das Azure Blob Storage-Konto erstellten Secret-Zugriffstoken oder SAS-Token.

    • Um einen Zugriffsschlüssel für Speicherkonten zu konfigurieren, legen Sie die fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net Eigenschaft wie im folgenden Codeausschnitt dargestellt fest:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Um ein SAS-Token zu konfigurieren, legen Sie die fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net Eigenschaft wie im folgenden Codeausschnitt gezeigt fest:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      
  2. Führen Sie den folgenden Daten-Wrangling-Code mit dem Daten-URI aus, der als wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>formatiert ist.

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Importieren und Löschen von Daten aus einem Azure Machine Learning-Datenspeicher

Um auf Daten aus einem Azure Machine Learning-Datenspeicher zuzugreifen, definieren Sie einen Pfad zu Daten im Datenspeicher mit dem URI-Formatazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>.

Führen Sie das folgende Code-Beispiel aus, um titanic.csv Daten aus einem Azure Machine Learning-Datenspeicher unter Verwendung von azureml:// Datenspeicher-URI und pyspark.pandas zu lesen und zu verarbeiten.

import pyspark.pandas as pd

df = pd.read_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
    index_col="PassengerId",
)

Azure Machine Learning-Datenspeicher können über einen Azure Storage-Konto-Zugriffsschlüssel, ein SAS-Token, Anmeldeinformationen des Dienstes oder einen Datenzugriff ohne Anmeldeinformationen auf Daten zugreifen. Wählen Sie den geeigneten Authentifizierungsmechanismus abhängig vom Datenspeichertyp und dem zugrunde liegenden Azure-Speicherkontotyp aus.

In der folgenden Tabelle sind die Authentifizierungsmechanismen für den Zugriff auf Daten in Azure Machine Learning-Datenspeichern zusammengefasst:

Speicherkontotyp Zugriff auf Daten ohne Anmeldeinformationen Datenzugriffsmechanismus Rollenzuweisungen
Azure Blob Nein Zugriffsschlüssel oder SAS-Token Es sind keine Rollenzuweisungen erforderlich.
Azure Blob Ja Passthrough der Benutzeridentität Die Benutzeridentität sollte über entsprechende Rollenzuweisungen im Azure Blob Storage-Konto verfügen.
Azure Data Lake Storage Nein Dienstprinzipal Der Dienstprinzipal sollte über entsprechende Rollenzuweisungen im Azure Data Lake Storage-Speicherkonto verfügen.
Azure Data Lake Storage Ja Passthrough der Benutzeridentität Die Benutzeridentität sollte über entsprechende Rollenzuweisungen im Azure Data Lake Storage-Speicherkonto verfügen.

* Die Durchleitung der Benutzeridentität funktioniert bei Datenspeichern ohne Berechtigungsnachweis, die auf Azure Blob Storage-Konten verweisen, nur, wenn soft Delete nicht aktiviert ist.

Zugriff auf Daten in der Standard-Dateifreigabe

In Azure Machine Learning Studio ist Ihr Standard-Dateifreigaben im Arbeitsbereich der Verzeichnisbaum unter der Registerkarte Dateien in Notebooks. Der Notebook-Code kann direkt auf Dateien zugreifen, die in dieser Dateifreigabe mit dem file://-Protokoll gespeichert sind, wobei der absolute Pfad einer Datei ohne zusätzliche Konfiguration verwendet wird. Die Standard-Dateifreigabe wird sowohl in serverlose Spark Compute- als auch in angeschlossene Synapse Spark-Pools eingebunden.

Screenshot der Verwendung einer Dateifreigabe.

Der folgende Codeausschnitt greift auf Daten aus der titanic.csv Datei zu, die in einem Datenordner direkt unter dem Benutzernamen in der Standarddateifreigabe gespeichert ist. Ersetzen Sie den <USER> Platzhalter durch Ihren Benutzernamen.

import os
import pyspark.pandas as pd

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")