Spark の基本

サイズ設定、最適化、トラブルシューティングを支える主要な概念。 Fabric で Spark を初めて使用する場合は、まずこの記事をお読みください。

一般的なやるべきこととやるべきでないこと

シナリオ: Spark を初めて使用する場合。 やるべきこととやってはいけないことは何ですか?
利用シーン ベスト プラクティス
最適化されたシリアル化形式を使用する Do: Avro、Parquet、Optimized Row Columnar (ORC) などの形式は、スキーマを埋め込み、コンパクトで、ストレージと処理を最適化するため、優先します。 Fabric では、アトミック性、一貫性、分離、持続性 (ACID) の保証とパフォーマンスの利点にデルタ形式を使用します
XML/JSON には注意してください Spark はデータセット全体を読み取ってスキーマを推論するため、大きな JavaScript Object Notation (JSON) ファイルまたは Extensible Markup Language (XML) ファイルのスキーマ推論に依存しないでください。この場合、処理が遅くなり、メモリが集中的に消費されます。

JSON/XML を読み取るときに静的プライマリ スキーマを指定するか、 .option("samplingRatio", 0.1) を使用して読み取りを高速化しますが、サンプルが完全なデータセットを表していない場合、読み取りが失敗する可能性があることに注意してください。 より安全な方法では、代表的なサンプルからスキーマが推論され、すべての読み取りで保持されます。

大きな XML ファイルの解析は避けてください。 XML 解析の実行は、タグ処理と型キャストにより本質的に遅くなります。
結合とフィルター処理を最適化する Do: 結合の前に列の排除と行レベルのフィルター処理を適用して、シャッフルとメモリ使用量を減らします。

DataFrame API を使用すると、Catalyst オプティマイザーによって述語のプッシュダウンが自動的に処理されます。 回復性のある分散データセット (RDD) API は Catalyst の最適化をバイパスするため、回避します。
RDD よりもデータフレームを優先する Do: ほとんどの操作で RDD の代わりに DataFrame を使用します。 DataFrame は、効率的な実行のために Catalyst オプティマイザーとタングステン実行エンジンを使用します。
アダプティブ クエリ実行を有効にする (AQE) 実行: AQE を有効にして、シャッフル パーティションを動的に最適化し、歪んだデータを自動的に処理します。

Executor のメモリ管理

シナリオ: パフォーマンス チューニングのための Executor メモリ管理を理解する必要があります。

Executor が 56 GB のメモリで構成されている場合でも、Spark では、そのすべてをユーザー データに直接使用することはできません。 Spark Core では、Executor メモリが分割および管理されます。

  • 予約済みメモリ: システムと Spark の内部オーバーヘッド (Java 仮想マシン (JVM)、内部など) 用に予約された固定部分。

  • ユーザー メモリ: ユーザー定義関数 (UDF)、ローカル変数、データ構造 (リスト、マップ、ディクショナリ)、および計算中に作成されたオブジェクトを格納します。

  • ストレージ メモリ: キャッシュ/永続化されたデータ、ブロードキャスト変数、キャッシュ可能なシャッフル データを保持します。

  • 実行メモリ: 中間計算 (シャッフル、結合、並べ替え、集計) に使用されます。

  • 動的メモリ共有: ストレージと実行メモリの境界は移動可能です。 Spark では、あるリージョンから他方のリージョンにメモリを借用できるため、柔軟なメモリ使用量が可能になります。

  • スピル: ストレージまたは実行メモリの需要が、借用した後の使用可能メモリを超えた場合に発生します。 これにより、データが強制的にディスクに保存され、パフォーマンスに影響する可能性があります。

    Spark メモリ管理とスピル処理の図。

メモリ不足 (OOM) エラー

シナリオ: Spark ジョブがメモリ不足 (OOM) エラーで失敗する。

ドライバー OOM:

ドライバー OOM エラーは、Spark ドライバーが割り当てられたメモリを超えたときに発生します。

一般的な原因は、ドライバー のメモリに大量のデータをプルする collect()countByKey()、または大きな toPandas() 呼び出しなどのドライバーの負荷の高い操作です。

軽減策: 可能な限りドライバーの負荷の高い操作を避けます。 避けられない場合は、ドライバーのサイズとベンチマークを大きくして、最適な構成を見つけます。

Executor のメモリ不足 (OOM):

Executor OOM エラーは、Spark Executor が割り当てられたメモリを超えたときに発生します。

一般的な原因: 大規模なデータセット (ワイド結合、集計、シャッフルなど) や、Executor の使用可能なメモリ (実行 + ストレージ領域) を超えるキャッシュまたは永続化されたデータセットに対するメモリとコンピューティング集中型の変換。

軽減策: 必要に応じて Executor メモリを増やし、Spark メモリの割合 (spark.memory.fraction、spark.memory.storageFraction) を調整し、選択的に永続化します。 キャッシュされたデータが使用可能なメモリに収まるようにします。

データの偏り

スキューの兆候

  • いくつかのタスクは、Spark UI の他のタスクよりも時間がかかります (ステージ タスクは重い尾を示します)。
  • ステージ メトリックの中央値と最大タスク時間の間の大きなギャップ。
  • いくつかのパーティションに対して大きなシャッフル読み取りまたは書き込みサイズを持つステージ。

一般的な原因:

  • 結合/グループ キー (ホット キー) の不均一なデータ分散。
  • データ ボリュームのパーティション分割が正しくないか、パーティションが少なすぎます。
  • 大量のレコードまたは多数の null/空のキーを生成するアップストリーム データの異常。

緩和:

  • パーティションの並列処理とバランス サイズを増やすために、パーティションを再分割または結合します。
  • キーソルトやカスタムパーティション分割を行い、パーティション間で頻繁にアクセスされるキーを均等に分散させます。
  • AQE (アダプティブ クエリ実行) を使用して、シャッフル後のパーティションを結合し、スキュー結合の最適化を有効にします。
  • 小さな参照テーブルにはブロードキャスト結合を利用し、シャッフルを完全に避けましょう。
  • 負荷の高いステージの前にバランスの取れた中間データセットを保持し、ジョブを再実行します。

UDF のベスト プラクティス

シナリオ: 組み込みの DataFrame 関数では表現できないカスタム ロジックを適用する必要があります。

可能な限り Spark DataFrame API を使用します。 Catalyst オプティマイザーは、組み込みの関数を最適化し、JVM 上でネイティブに実行するため、最適なパフォーマンスを実現します。

UDF (ユーザー定義関数) を使用する必要がある場合は、通常の PySpark Python UDF を使用しないでください。 代わりに、次の代替手段を検討してください。

  • Pandas UDF (ベクター化された UDF とも呼ばれます): JVM と Python 間の効率的なデータ転送には Apache Arrow を使用します。 Pandas UDF を使用すると、ベクター化された操作が可能であり、行単位の Python UDF と比較してパフォーマンスが大幅に向上します。

  • Scala/Java UDF: JVM で直接実行し、Python シリアル化のオーバーヘッドを回避します。 Scala/Java UDF は通常、Python UDF よりも優えています。

Python UDF には注意してください。 各 Executor は個別の Python プロセスを起動し、JVM と Python の間のデータのシリアル化と逆シリアル化を必要とします。 これにより、特に大規模なパフォーマンスのボトルネックが発生します。 

エラーのログ

シナリオ: Fabric Spark でのエラー ログ記録のベスト プラクティス
  1. ドライバーの負担が大きいlog4jではなく、print()を使用します。 log4jを使用すると、ドライバー ログのログにアクセスして検索できます (ロガー名を使用します (PySparkLogger など)。

    Spark ログの図。

  2. 読み取り、書き込み、変換を try ブロックと except ブロックでラップします。 例外には logger.error を使用し、進行状況メッセージには logger.info を使用します。

    • Python ログ: Spark ドライバーでのみ実行されるコードからの操作、状態の更新、またはデバッグ情報のログ記録に最適です。 Python のログ モジュールは、Executor ログに反映されません。 ノートブックの 開発、実行、管理に関するドキュメントを参照してください

    • Spark log4j: Spark のドライバー/Executor ログとネイティブに統合される、Spark での堅牢な運用レベルのアプリケーション ログ記録の標準。

    PySpark での log4j の使用例:

    import traceback
    # Get log4j logger
    log4jLogger = spark._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger("PySparkLogger")
    logger.info("Application started.")
    try:
        # Create DataFrame with 20 records
        data = [(f"Name{i}", i) for i in range(1, 21)]  # 20 records
        df = spark.createDataFrame(data, ["name", "age"])
        logger.info("DataFrame created successfully with 20 records.")
        df.show(s)  # 's' is not defined -> will throw error but the application will not fail
    except Exception as e:
        logger.error(f"Error while creating or showing DataFrame: {str(e)}\n{traceback.format_exc()}")
    
  3. エラー監視を一元化する:

    • 環境内で診断エミッタ拡張機能 (Azure Log Analytics を使用して Apache Spark アプリケーションを監視) を使用し、Spark アプリケーションを実行しているノートブックにアタッチします。 エミッターは、イベント ログ、カスタム ログ (log4j など)、メトリックを Azure Log Analytics/Azure Storage/Azure Event Hubs に送信できます。 log4j 名をプロパティに渡します: spark.synapse.diagnostic.emitter.\<destination\>.filter.loggerName.match

    • さらに、デバッグのために、失敗した行/レコードを Lakehouse (LH) テーブルに収集して、レコード レベルの不適切なデータ キャプチャを行うこともできます。