Verwenden von Apache Spark (unterstützt von Azure Synapse Analytics) in Ihrer Machine Learning-Pipeline (veraltet)

GILT FÜR:Azure Machine Learning SDK v1 für Python

Wichtig

Dieser Artikel enthält Informationen zur Verwendung des Azure Machine Learning SDK v1. SDK v1 ist ab dem 31. März 2025 veraltet. Der Support für sie endet am 30. Juni 2026. Sie können SDK v1 bis zu diesem Datum installieren und verwenden. Ihre vorhandenen Workflows mit SDK v1 funktionieren weiterhin nach dem Enddatum des Supports. Sie können jedoch Sicherheitsrisiken oder Breaking Changes im Falle von Architekturänderungen im Produkt ausgesetzt sein.

Es wird empfohlen, vor dem 30. Juni 2026 zum SDK v2 zu wechseln. Weitere Informationen zu SDK v2 finden Sie unter Was ist Azure Machine Learning CLI und Python SDK v2? und die SDK v2-Referenz.

Warnung

Die Integration von Azure Synapse Analytics mit Azure Machine Learning, die im Python SDK v1 verfügbar war, ist veraltet. Benutzende können Synapse Workspace, das bei Azure Machine Learning registriert ist, weiterhin als verknüpften Dienst verwenden. Ein neuer Synapse-Arbeitsbereich kann jedoch nicht mehr bei Azure Machine Learning als verknüpfter Dienst registriert werden. Wir empfehlen die Verwendung von serverlosen Spark Compute- und angeschlossenen Synapse Spark-Pools, die in CLI v2 und Python SDK v2 verfügbar sind. Weitere Informationen finden Sie unter Konfigurieren von Apache Spark-Aufträgen in Azure Machine Learning.

Hinweis

Abgelöste Pakete: Die folgenden SDK v1 Pakete, die in diesem Artikel verwendet werden, werden abgelöst: azureml-pipeline, azureml-pipeline-core, azureml-pipeline-steps, azureml-train-core und azureml-pipeline-internal. Klassen wie SynapseSparkStep, PythonScriptStep und Pipeline sind Teil dieser auslaufenden Pakete.

In diesem Artikel erfahren Sie, wie Sie Apache Spark-Pools verwenden, unterstützt von Azure Synapse Analytics, als Computeziel für einen Datenvorbereitungsschritt in einer Azure Machine Learning-Pipeline. Sie erfahren, wie eine einzelne Pipeline Computeressourcen verwenden kann, die für einen bestimmten Schritt geeignet sind, z. B. Datenvorbereitung oder Training. Sie werden außerdem lernen, wie die Daten für den Spark-Schritt vorbereitet und an den nächsten Schritt übergeben werden.

Voraussetzungen

Erstellen und verwalten Sie Ihre Apache Spark-Pools in einem Azure Synapse Analytics-Arbeitsbereich. Wenn Sie einen Apache Spark-Pool mit einem Azure Machine Learning-Arbeitsbereich integrieren möchten, müssen Sie eine Verknüpfung mit dem Azure Synapse Analytics-Arbeitsbereich herstellen. Sobald Sie Ihren Azure Machine Learning-Arbeitsbereich und Ihre Azure Synapse Analytics-Arbeitsbereiche verknüpft haben, können Sie einen Apache Spark-Pool anfügen mit

  • Azure Machine Learning Studio

  • Python SDK, wie weiter unten erläutert

  • Azure Resource Manager (ARM)-Vorlage. Weitere Informationen finden Sie unter Beispiel-ARM-Vorlage

    • Sie können die Befehlszeile verwenden, um die ARM-Vorlage zu befolgen, den verknüpften Dienst hinzuzufügen und den Apache Spark-Pool mit diesem Codebeispiel anzufügen:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Wichtig

Um erfolgreich eine Verknüpfung mit dem Synapse-Arbeitsbereich herzustellen, benötigen Sie die Rolle Besitzer im Synapse-Arbeitsbereich. Überprüfen Sie Ihren Zugriff im Azure-Portal.

Der verknüpfte Dienst erhält bei der Erstellung eine vom System zugewiesene verwaltete Identität (SAI). Sie müssen diesen Linkdienst SAI der Rolle "Synapse Apache Spark administrator" von Synapse Studio zuweisen, damit er den Spark-Auftrag übermitteln kann. Weitere Informationen finden Sie unter "Verwalten von Synapse RBAC-Rollenzuweisungen" in Synapse Studio .

Der Azure Machine Learning-Arbeitsbereichsbenutzer benötigt auch die Rolle "Mitwirkender" aus dem Azure-Portal für die Ressourcenverwaltung.

Das folgende Codebeispiel zeigt, wie verknüpfte Dienste in Ihrem Arbeitsbereich abgerufen werden:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

In diesem Codebeispiel Workspace.from_config() greift auf Ihren Azure Machine Learning-Arbeitsbereich mit der Konfiguration in der config.json Datei zu. (Weitere Informationen finden Sie unter Erstellen einer Arbeitsbereichskonfigurationsdatei). Anschließend gibt der Code alle verknüpften Dienste aus, die im Arbeitsbereich verfügbar sind. Schließlich ruft LinkedService.get() einen verknüpften Dienst namens „'synapselink1'“ ab.

Anfügen Ihres Apache Spark-Pools als Computeziel für Azure Machine Learning

Um Ihren Apache Spark-Pool zur Unterstützung eines Schritts in Ihrer Machine Learning-Pipeline zu verwenden, müssen Sie ihn als ComputeTarget für den Pipelineschritt anfügen, wie im folgenden Codebeispiel dargestellt.

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

Der Code konfiguriert zuerst SynapseCompute. Das Argument „linked_service“ ist das LinkedService-Objekt, das Sie im vorherigen Schritt erstellt oder abgerufen haben. Das Argument „type“ muss SynapseSpark sein. Das Argument „pool_name“ in SynapseCompute.attach_configuration() muss mit einem vorhandenen Pool in Ihrem Azure Synapse Analytics-Arbeitsbereich übereinstimmen. Weitere Informationen zum Erstellen eines Apache Spark-Pools im Azure Synapse Analytics-Arbeitsbereich finden Sie unter Schnellstart: Erstellen eines serverlosen Apache Spark-Pools mithilfe von Synapse Studio. Der attach_config-Typ ist ComputeTargetAttachConfiguration.

Nachdem Sie die Konfiguration erstellt haben, erstellen Sie eine ComputeTarget für maschinelles Lernen, indem Sie die Workspace- und ComputeTargetAttachConfiguration-Werte sowie den Namen übergeben, mit dem Sie innerhalb des Machine-Learning-Arbeitsbereichs auf die Rechenressourcen verweisen möchten. Der Aufruf von ComputeTarget.attach() ist asynchron, sodass das Beispiel blockiert wird, bis der Aufruf abgeschlossen ist.

Erstellen eines SynapseSparkStep-Elements, das den verknüpften Apache Spark-Pool verwendet

Hinweis

Informationen zu SDK v2/CLI v2-Ansätzen für Spark-Aufträge und -Pipelines finden Sie unter Konfigurieren von Apache Spark-Aufträgen in Azure Machine Learning. Serverless-Compute von Spark und zugehörige Synapse Spark Pools sind in CLI v2 und Python SDK v2 verfügbar und bieten eine Alternative zu den in diesem Artikel gezeigten v1-basierten SynapseSparkStep.

Das Beispielnotebook Spark-Auftrag im Apache Spark-Pool definiert eine einfache Machine Learning-Pipeline. Zuerst definiert das Notebook einen Datenaufbereitungsschritt, der durch das im vorherigen Schritt definierte synapse_compute unterstützt wird. Anschließend definiert das Notebook einen Trainingsschritt, der von einem Computeziel unterstützt wird, das besser für das Training geeignet ist. Das Beispielnotebook verwendet die Titanic-Überlebensdatenbank, um Dateneingaben und -ausgaben anzuzeigen. Tatsächlich werden die Daten nicht bereinigt und kein Vorhersagemodell erstellt. Da dieses Beispiel nicht wirklich ein Training beinhaltet, verwendet der Trainingsschritt eine kostengünstige, CPU-basierte Computeressource.

Daten fließen in eine Machine Learning-Pipeline über DatasetConsumptionConfig-Objekte ein, die Tabellendaten oder eine Reihe von Dateien enthalten können. Die Daten stammen häufig aus Dateien im Blobspeicher eines Datenspeichers für den Arbeitsbereich. Das folgende Codebeispiel zeigt typischen Code, der Eingaben für eine Machine Learning-Pipeline erstellt:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

Das Codebeispiel geht davon aus, dass sich die Datei „Titanic.csv“ im Blobspeicher befindet. Der Code zeigt, wie die Datei als TabularDataset und auch als FileDataset gelesen wird. Der Code dient nur zu Demonstrationszwecken, da es verwirrend wäre, Eingaben zu duplizieren oder eine einzelne Datenquelle als eine tabelle enthaltende Ressource und streng als Datei zu interpretieren.

Wichtig

Um ein FileDataset als Eingabe zu verwenden, benötigen Sie mindestens eine azureml-core-Version von 1.20.0. Sie können dies mit der Environment-Klasse angeben, wie weiter unten beschrieben. Wenn ein Schritt abgeschlossen ist, können Sie die Ausgabedaten, wie in diesem Codebeispiel gezeigt, speichern:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

In diesem Codebeispiel würden die datastore-Daten in einer Datei mit dem Namen test gespeichert. Die Daten wären im Machine Learning-Arbeitsbereich als Dataset mit dem Namen registered_dataset verfügbar.

Zusätzlich zu den Daten kann ein Pipelineschritt auch schrittweise Python-Abhängigkeiten haben. Außerdem können einzelne SynapseSparkStep-Objekte ihre exakte Azure Synapse Apache Spark-Konfiguration angeben. Als Demonstration gibt das folgende Codebeispiel an, dass die azureml-core Paketversion mindestens 1.20.0sein muss. Wie bereits erwähnt, ist diese Anforderung für das azureml-core-Paket erforderlich, um ein FileDataset als Eingabe zu verwenden.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Dieser Code gibt einen einzelnen Schritt in der Azure Machine Learning-Pipeline an. Der environment-Wert dieses Codes legt eine bestimmte azureml-core-Version fest, und der Code kann bei Bedarf weitere Conda- oder Pip-Abhängigkeiten hinzufügen.

Der SynapseSparkStep zippt das Unterverzeichnis „./code“ vom lokalen Computer und lädt es hoch. Dieses Verzeichnis wird auf dem Computeserver neu erstellt, und der Schritt führt das Skript „dataprep.py“ aus diesem Verzeichnis aus. Die inputs und outputs dieses Schritts sind die Objekte step1_input1, step1_input2, und step1_output, die zuvor erläutert wurden. Die einfachste Möglichkeit, auf diese Werte innerhalb des dataprep.py-Skripts zuzugreifen, besteht darin, ihnen benannte arguments zuzuordnen.

Der nächste Satz von Argumenten für den SynapseSparkStep-Konstruktor steuert Apache Spark. Das compute_target ist die 'link1-spark01' Ressource, die wir zuvor als Berechnungsziel angefügt haben. Mit den anderen Parametern werden der Arbeitsspeicher und die Kerne angegeben, die wir verwenden möchten.

Das Beispielnotebook verwendet diesen Code für dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Dieses Skript zur "Datenvorbereitung" führt keine echte Datentransformation durch, zeigt jedoch, wie Daten abgerufen, in einen Spark-Datenrahmen konvertiert und einige grundlegende Apache Spark-Manipulationen durchgeführt werden. Um die Ausgabe in Azure Machine Learning Studio zu finden, öffnen Sie den untergeordneten Auftrag, wählen die Registerkarte Ausgaben und Protokolle aus und öffnen die Datei „logs/azureml/driver/stdout“, wie in diesem Screenshot gezeigt:

Screenshot von Studio mit der Registerkarte „stdOut“ des untergeordneten Auftrags.

Verwenden von SynapseSparkStep in einer Pipeline

Im nächsten Beispiel wird die Ausgabe aus dem SynapseSparkStep, der im vorherigen Abschnitt erstellt wurde, verwendet. Andere Schritte in der Pipeline können möglicherweise ihre eigenen eindeutigen Umgebungen aufweisen und möglicherweise auf verschiedenen Computeressourcen ausgeführt werden, die für die jeweilige Aufgabe geeignet sind. Das Beispielnotebook führt den „Trainingsschritt“ auf einem kleinen CPU-Cluster aus:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Dieser Code erstellt die neue Computeressource, falls erforderlich. Anschließend konvertiert es das step1_output-Ergebnis in Eingaben für den Trainingsschritt. Die Option „as_download()“ bedeutet, dass die Daten auf die Computeressource verschoben werden, was zu einem schnelleren Zugriff führt. Wenn die Daten so umfangreich sind, dass sie nicht auf die Festplatte der lokalen Compute-Instanz passen, müssten Sie die Option „as_mount()“ verwenden, um die Daten mit dem FUSE-Dateisystem zu streamen. Das compute_target dieses zweiten Schrittes ist 'cpucluster', nicht die 'link1-spark01'-Ressource, die Sie im Schritt der Datenaufbereitung verwendet haben. In diesem Schritt wird anstelle des train.py-Skripts, das Sie im vorherigen Schritt verwendet haben, ein einfaches dataprep.py-Skript verwendet. Das Beispielnotebook enthält Details zum train.py-Skript.

Nachdem Sie alle Schritte definiert haben, können Sie Ihre Pipeline erstellen und ausführen.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Dieser Code erstellt eine Pipeline, die aus dem von Azure Synapse Analytics unterstützten Datenaufbereitungsschritt (step_1) für Apache Spark-Pools und dem Trainingsschritt (step_2) besteht. Azure untersucht die Datenabhängigkeiten zwischen den Schritten, um das Ausführungsdiagramm zu berechnen. In diesem Fall gibt es nur eine einfache Abhängigkeit. Hier erfordert step2_input notwendigerweise step1_output.

Der Aufruf von pipeline.submit erstellt bei Bedarf ein Experiment mit dem Namen synapse-pipeline und startet darin asynchron einen Auftrag. Einzelne Schritte innerhalb der Pipeline werden als untergeordnete Aufträge dieses Hauptauftrags ausgeführt, und die Seite "Experimente" von Studio kann diese Schritte überwachen und überprüfen.

Nächste Schritte