Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Kernkonzepte zur Unterstützung der Größenanpassung, Optimierung und Problembehandlung. Lesen Sie dies zuerst, wenn Sie noch nicht mit Spark in Fabric arbeiten.
Allgemeine Gebote und Verbote
Szenario: Sie sind neu bei Spark. Was sollte man tun und was nicht?
| Anwendungsfall | Bewährte Methoden |
|---|---|
| Verwenden optimierter serialisierter Formate | Do: Bevorzugen Sie Formate wie Avro, Parquet oder Optimized Row Columnar (ORC), da sie Schemas einbetten, kompakt sind und die Speicherung und Verarbeitung optimieren. In Fabric sollten Sie das Delta-Format zur Gewährleistung von Atomarität, Konsistenz, Isolation, Haltbarkeit (ACID) und Leistungsvorteilen verwenden. |
| Seien Sie vorsichtig mit XML/JSON | Verlassen Sie sich nicht auf die Schemaableitung für große JAVAScript Object Notation (JSON)- oder EXTensible Markup Language (XML)-Dateien, da Spark das gesamte Dataset zum Ableiten des Schemas liest, wodurch die Verarbeitung verlangsamt und der Speicher intensiv verbraucht wird. Stellen Sie beim Lesen von JSON/XML ein statisches primäres Schema bereit, oder verwenden .option("samplingRatio", 0.1) Sie zum Beschleunigen von Lesevorgängen, beachten Sie jedoch, dass beim Lesen des Beispiels möglicherweise ein Fehler auftritt, wenn das vollständige Dataset nicht dargestellt wird. Ein sichererer Ansatz leitet ein Schema aus einem repräsentativen Beispiel ab und speichert es für alle Lesevorgänge.Vermeiden Sie die Analyse großer XML-Dateien. Die XML-Analyse wird aufgrund von Tagverarbeitung und Typ casting inhärent langsamer ausgeführt. |
| Optimieren von Verknüpfungen und Filtern | Do: Wenden Sie die Spaltenkürzung und Zeilenfilterung vor den Join-Operationen an, um die Shuffle- und Datennutzung zu reduzieren. Der Catalyst-Optimierer übernimmt automatisch die Verarbeitung von Prädikaten-Pushdown, wenn Sie die DataFrame-APIs verwenden. Vermeiden Sie RDD-APIs (Resilient Distributed Dataset), da sie Katalysatoroptimierungen umgehen. |
| Bevorzugen Sie DataFrames über RDDs | Nutzen Sie DataFrames anstelle von RDDs für die meisten Operationen. DataFrames verwenden den Catalyst-Optimierer und das Tungsten-Ausführungsmodul für eine effiziente Ausführung. |
| Aktivieren der Ausführung adaptiver Abfragen (AQE) | Do: Aktivieren Sie AQE, um Shuffle-Partitionen dynamisch zu optimieren und verzerrte Daten automatisch zu behandeln. |
Speicherverwaltung des Executors
Szenario: Sie möchten die Speicherverwaltung des Executors für die Leistungsoptimierung verstehen.
Selbst wenn ein Executor mit 56 GB Arbeitsspeicher konfiguriert ist, lässt Spark nicht zu, dass er direkt für Benutzerdaten verwendet wird. Spark Core teilt und verwaltet den Ausführungsspeicher:
Reservierter Speicher: Ein fester Teil, der für System- und Spark-internen Overhead reserviert ist (z. B. Java Virtual Machine (JVM), Interne).
Benutzerspeicher: Speichert benutzerdefinierte Funktionen (USER Defined Functions, UDFs), lokale Variablen, Datenstrukturen (Listen, Karten, Wörterbücher) und Objekte, die während der Berechnung erstellt wurden.
Speicherspeicher: Enthält zwischengespeicherte/beibehaltene Daten, Übertragungsvariablen und Shuffle-Daten, die zwischengespeichert werden können.
Ausführungsspeicher: Wird für die Zwischenberechnung (Shuffles, Verknüpfungen, Sortierungen, Aggregationen) verwendet.
Dynamische Speicherfreigabe: Die Grenze zwischen Speicher- und Ausführungsspeicher ist verschiebbar. Spark kann Speicher von einer Region zur anderen ausleihen, was eine flexible Speicherauslastung ermöglicht.
Spill: Tritt auf, wenn entweder der Speicher- oder der Ausführungspeicherbedarf den verfügbaren Speicher überschreitet, nachdem er ausgeliehen wurde. Dies erzwingt, dass Daten auf die Festplatte geschrieben werden, was die Leistung beeinflussen kann.
Fehler außerhalb des Arbeitsspeichers (OOM)
Szenario: Spark-Aufträge schlagen mit OOM-Fehlern (Out of Memory) fehl.
Treiber OOM:
Treiber-OOM-Fehler treten auf, wenn der Spark-Treiber den zugewiesenen Speicher überschreitet.
Häufige Ursache: Treiberlastige Vorgänge wie collect(), countByKey()oder große toPandas() Aufrufe, die zu viele Daten in den Treiberspeicher ziehen.
Minderung: Vermeiden Sie, wenn möglich, treiberintensive Vorgänge. Wenn unvermeidbar, erhöhen Sie die Fahrergröße und den Benchmark, um die optimale Konfiguration zu finden.
Executor Out of Memory (OOM):
OOM-Fehler treten auf, wenn ein Spark-Executor den zugewiesenen Speicher überschreitet.
Häufige Ursache: Speicher- und rechenintensive Transformationen für große Datasets (z. B. breite Verknüpfungen, Aggregationen, Shuffles) oder zwischengespeicherte/persistente Datasets, die den verfügbaren Speicher des Executors überschreiten (Ausführung + Speicherbereiche).
Maßnahmen: Erhöhen Sie bei Bedarf den Speicher des Executors, optimieren Sie die Spark-Speicherfraktionen (spark.memory.fraction, spark.memory.storageFraction) und sorgen Sie für gezielte Persistenz. Stellen Sie sicher, dass zwischengespeicherte Daten in den verfügbaren Arbeitsspeicher passen.
Datenverzerrungen
Symptome von Verzerrung
- Einige Aufgaben dauern länger als andere in der Spark-UI (Phasenaufgaben zeigen schweres Tail).
- Große Lücke zwischen median und maximalen Vorgangszeiten in Phasenmetriken.
- Phasen mit großen Shuffle-Lese- oder Schreibgrößen für einige Partitionen.
Häufige Ursachen:
- Ungleiche Datenverteilung für die Verknüpfungs-/Gruppenschlüssel (Hot Keys).
- Falsche Partitionierung oder zu wenige Partitionen für das Datenvolume.
- Upstream-Datenanomalien, die große Datensätze oder viele NULL/leere Schlüssel erzeugen.
Milderung:
- Neu partitionieren oder zusammenlegen, um die Parallelität der Partitionen zu erhöhen und die Größen auszugleichen.
- Wenden Sie Schlüssel-Salting oder benutzerdefinierte Partitionierung an, um heiße Schlüssel auf Partitionen zu verteilen.
- Verwenden Sie AQE (adaptive Abfrageausführung), um Nach-Shuffle-Partitionen zusammenzukoppeln und Skew-Join-Optimierungen zu aktivieren.
- Verwenden Sie Broadcast-Verknüpfungen für kleine Nachschlagetabellen, um Umverteilungen vollständig zu vermeiden.
- Speichern Sie gleichgewichtete Zwischendatensätze vor den ressourcenintensiven Phasen, und führen Sie den Job erneut aus.
Bewährte Methoden für UDF
Szenario: Sie müssen benutzerdefinierte Logik anwenden, die nicht über integrierte DataFrame-Funktionen ausgedrückt werden kann.
Verwenden Sie nach Möglichkeit Spark DataFrame-APIs. Der Katalysatoroptimierer optimiert integrierte Funktionen und führt sie nativ auf dem JVM aus, sodass sie die beste Leistung erzielen.
Wenn Sie eine UDF (User Defined Function) verwenden müssen, vermeiden Sie reguläre PySpark Python UDFs. Betrachten Sie stattdessen die folgenden Alternativen:
Pandas UDFs (auch als Vectorized UDFs bezeichnet): Verwenden Sie Apache Arrow für eine effiziente Datenübertragung zwischen JVM und Python. Pandas UDFs ermöglichen vektorisierte Vorgänge und verbessern die Leistung im Vergleich zu Python-UDFs in Zeilen nach Zeile erheblich.
Scala/Java UDFs: Führen Sie direkt auf dem JVM aus, und vermeiden Sie den Python-Serialisierungsaufwand. Scala/Java UDFs übertreffen in der Regel Python UDFs.
Seien Sie vorsichtig mit Python UDFs. Jeder Executor startet einen separaten Python-Prozess, der Serialisierung und Deserialisierung von Daten zwischen dem JVM und Python erfordert. Dadurch entsteht ein Leistungsengpass, insbesondere im großen Maßstab.
Fehlerprotokollierung
Szenario: Bewährte Methoden für die Fehlerprotokollierung in Fabric Spark
Verwenden Sie
log4janstelle dessenprint(), was den Fahrer stark belastet. Mitlog4j, können Sie auf Protokolle in Treiberprotokollen zugreifen und sie durchsuchen (z. B. mit dem Loggernamen: PySparkLogger).Umschließen Sie Lese-, Schreib- und Transformationsvorgänge in try und except-Blöcke. Verwenden Sie
logger.errorfür Ausnahmen undlogger.infofür Fortschrittsmeldungen.Python-Protokollierung: Ideal für Protokollierungsvorgänge, Statusaktualisierungen oder Debugginginformationen aus Code, der nur auf dem Spark-Treiber ausgeführt wird. Das Protokollierungsmodul von Python wird nicht an Executorprotokolle weitergegeben. Weitere Informationen finden Sie in der Dokumentation zum Entwickeln, Ausführen und Verwalten von Notizbüchern.
Spark log4j: Der Standard für die robuste Anwendungsprotokollierung auf Produktionsebene in Spark, da sie systemintern in Spark-Treiber-/Executorprotokolle integriert wird.
Beispiel log4j-Verwendung in PySpark:
import traceback # Get log4j logger log4jLogger = spark._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger("PySparkLogger") logger.info("Application started.") try: # Create DataFrame with 20 records data = [(f"Name{i}", i) for i in range(1, 21)] # 20 records df = spark.createDataFrame(data, ["name", "age"]) logger.info("DataFrame created successfully with 20 records.") df.show(s) # 's' is not defined -> will throw error but the application will not fail except Exception as e: logger.error(f"Error while creating or showing DataFrame: {str(e)}\n{traceback.format_exc()}")Zentrale Fehlerüberwachung:
Verwenden Sie die Diagnoseemitter-Erweiterung (Monitor Apache Spark-Anwendungen mit Azure Log Analytics) in der Umgebung und verknüpfen Sie sie mit den Notizbüchern, in denen Spark-Anwendungen ausgeführt werden. Der Emitter kann Ereignisprotokolle, benutzerdefinierte Protokolle (z. B. Log4j) und Metriken an Azure Log Analytics/Azure Storage/Azure Event Hubs senden. Übergeben Sie den Log4j-Namen an die Eigenschaft:
spark.synapse.diagnostic.emitter.\<destination\>.filter.loggerName.match.Darüber hinaus können Sie beim Debuggen fehlgeschlagene Zeilen/Datensätze auch in Lakehouse-Tabellen (LH) sammeln, um fehlerhafte Daten auf Ebene der Einzeldatensätze zu erfassen.