次の方法で共有


Apache Spark を使用した対話型データ整形

データ ラングリングは、機械学習プロジェクトの重要な側面です。 この記事では、Azure Synapse によってサポートされるサーバーレス Apache Spark コンピューティングで Azure Machine Learning ノートブックを実行して、対話型のデータ ラングリングを実行する方法について説明します。

この記事では、サーバーレス Spark コンピューティングをアタッチして構成する方法について説明します。 次に、サーバーレス Spark コンピューティングを使用して、複数のソースからデータにアクセスしてラングリングする方法について説明します。

前提条件

詳細については、以下を参照してください。

ノートブック セッションでサーバーレス Spark コンピューティングを使用する

サーバーレス Spark コンピューティングを使用することは、対話型データ ラングリングのために Spark クラスターにアクセスする最も簡単な方法です。 Synapse Spark プールに接続されたフル マネージドのサーバーレス Spark コンピューティングは、Azure Machine Learning ノートブックで直接使用できます。

次のいずれかのデータ アクセスとラングリング ソースとメソッドを使用するには、ファイルまたはノートブック ページの上部にある [コンピューティング] の横にある Azure Machine Learning サーバーレス Spark>Serverless Spark Compute - Available を選択して Spark サーバーレス コンピューティング をアタッチします。 コンピューティングがセッションにアタッチされるまでに 1 ~ 2 分かかる場合があります。

サーバーレス Spark セッションを構成する

サーバーレス Spark コンピューティングをアタッチした後、いくつかの値を設定または変更することで、Spark セッションを構成できます。 Spark セッションを構成するには:

  1. ファイルまたはノートブック ページの左上にある [ セッションの構成] を 選択します。
  2. [ セッションの構成 ] 画面で、次のいずれかの設定を変更します。
    • [コンピューティング] ウィンドウで、次の操作を行います。

      • [ノード サイズ] のドロップダウン メニューから別のサイズを選択して、マシン のサイズを変更します
      • Executor を動的に割り当てるかどうかを選択します。
      • Spark セッションの Executor の数を選択します。
      • ドロップダウン メニューから使用可能な場合は、別の Executor サイズ を選択します。
    • [設定] ウィンドウで、次の操作を行います。

      • Apache Spark のバージョンを 3.5 以外のバージョン (使用可能な場合) に変更します。

        Important

        Azure Synapse Runtime for Apache Spark 3.4 は、2026 年 3 月 31 日にサポートが終了します。 継続的なサポートのために Apache Spark 3.5 に移行します。 詳細については、 Azure Synapse ランタイムに関するページを参照してください。

      • セッション タイムアウトを防ぐのに役立つセッション タイムアウト値を分単位で大きい数値に変更します。

      • [ 構成設定] で、 プロパティ 名/値の設定を追加して、必要に応じてセッションを構成します。

        ヒント

        セッション レベルの Conda パッケージを使用する場合は、値が spark.hadoop.aml.enable_cachetrue 構成プロパティを追加すると、Spark セッションのコールド 開始時刻が短縮される可能性があります。 セッション レベルの Conda パッケージを使用したセッションコールド スタートは、通常、最初に 10 分から 15 分かかります。 以降のセッションコールドは、構成変数が true に設定された状態で開始されます。通常、3 ~ 5 分かかります。

    • [Python パッケージ] ウィンドウで、次の操作を行います。

      • Conda ファイルを使用してセッションを構成するには、[ Conda ファイルのアップロード] を選択します。 [conda ファイルの選択] の横にある [参照] を選択し、コンピューター上の適切な Conda YAML ファイルを参照して開いてアップロードします。
      • カスタム環境を使用するには、[ カスタム環境 ] を選択し、[環境の 種類] でカスタム環境を選択します。 詳細については、「 ソフトウェア環境の管理」を参照してください。
  3. [ 適用] を選択して、すべての構成を適用します。

セッション構成の変更は保持され、接続されているサーバーレス Spark コンピューティングを使用する他のノートブック セッションで使用できます。

Azure Data Lake Storage からのデータのインポートとラングリング

Azure Data Lake Storage アカウントに格納されているデータにアクセスしてラングリングするには、abfss://またはサービス プリンシパル ベースのアクセス権を持つ プロトコル URI を使用します。 ユーザー ID パススルーには、追加の構成は必要ありません。

いずれかの方法を使用するには、ユーザー ID またはサービス プリンシパルに、Azure Data Lake Storage アカウントの 共同作成者 ロールと ストレージ BLOB データ共同作成者ロールの割り当てが 必要です。

ユーザー ID パススルーの場合は、次のデータ ラングリング コード サンプルを実行して、abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>pyspark.pandas形式のデータ URI を使用します。 <STORAGE_ACCOUNT_NAME> プレースホルダーを Azure Data Lake Storage アカウントの名前に置き換え、<FILE_SYSTEM_NAME>をデータ コンテナーの名前に置き換えます。

import pyspark.pandas as pd

df = pd.read_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
    index_col="PassengerId",
)

サービス プリンシパルを使用する

サービス プリンシパルを使用して Azure Data Lake Storage からデータにアクセスしてラングリングするには、最初に次のようにサービス プリンシパルを設定します。

  1. サービス プリンシパルを作成 し、 必要なストレージ BLOB データ共同作成者と Key Vault シークレットのユーザー ロールを割り当てます

  2. アプリの登録からサービス プリンシパル テナント ID、クライアント ID、クライアント シークレットの値を取得し、値の Azure Key Vault シークレットを作成 します。

  3. セッション構成に次のプロパティ名と値のペアを追加して、サービス プリンシパルのテナント ID、クライアント ID、およびクライアント シークレットを設定します。 <STORAGE_ACCOUNT_NAME>をストレージ アカウント名に置き換え、<TENANT_ID>をサービス プリンシパルテナント ID に置き換えます。

    プロパティ名 価値
    fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net アプリケーション (クライアント) ID 値
    fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
    fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net クライアント シークレットの値
  4. 次のコードを実行します。 コード内の get_secret() 呼び出しは、Key Vault 名と、サービス プリンシパル テナント ID、クライアント ID、クライアント シークレット用に作成された Key Vault シークレットの名前によって異なります。

    from pyspark.sql import SparkSession
    
    sc = SparkSession.builder.getOrCreate()
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID, and secret from Azure Key Vault
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    # Set up a service principal that has access to the data
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_id,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        client_secret,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
        "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
    )
    
  5. コード サンプルに示すように、形式のデータ URI を使用して、titanic.csvデータをインポートしてラングリングします。 <STORAGE_ACCOUNT_NAME> プレースホルダーを Azure Data Lake Storage アカウントの名前に置き換え、<FILE_SYSTEM_NAME>をデータ コンテナーの名前に置き換えます。

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Azure BLOB ストレージからのデータのインポートとラングリング

Azure Blob Storage データには、 ストレージ アカウント アクセス キー または Shared Access Signature (SAS) トークンのいずれかを使用してアクセスできます。 資格情報をシークレットとして Azure Key Vault に格納し、Spark セッション構成でプロパティとして設定します。

  1. 次のいずれかのコード スニペットを実行します。 コード スニペットの get_secret() 呼び出しには、キー コンテナーの名前と、Azure Blob Storage アカウントアクセス キーまたは SAS トークン用に作成されたシークレットの名前が必要です。

    • ストレージ アカウントのアクセス キーを構成するには、次のコード スニペットに示すように、 fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net プロパティを設定します。

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • SAS トークンを構成するには、次のコード スニペットに示すように、 fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net プロパティを設定します。

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      
  2. wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>形式のデータ URI を使用して、次のデータ ラングリング コードを実行します。

    import pyspark.pandas as pd
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

Azure Machine Learning データストアからのデータのインポートとラングリング

Azure Machine Learning データストアからデータにアクセスするには、URI 形式でデータストア上のデータへのパスを定義しますazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>

次のコードサンプルを実行し、titanic.csvデータをAzure Machine Learning データストアから読み取り、azureml://データストアURI とpyspark.pandasを使って整形します。

import pyspark.pandas as pd

df = pd.read_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
    index_col="PassengerId",
)
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
    index_col="PassengerId",
)

Azure Machine Learning データストアは、Azure ストレージ アカウントのアクセス キー、SAS トークン、サービス プリンシパルの資格情報、または資格情報のないデータ アクセスを使用してデータにアクセスできます。 データストアの種類と基になる Azure ストレージ アカウントの種類に応じて、適切な認証メカニズムを選択します。

次の表は、Azure Machine Learning データストア内のデータにアクセスするための認証メカニズムをまとめたものです。

ストレージ アカウントの種類 資格情報のないデータ アクセス データ アクセス メカニズム ロールの割り当て
Azure BLOB いいえ アクセス キーまたは SAS トークン ロールの割り当ては必要ありません。
Azure BLOB はい ユーザー ID パススルー* ユーザー ID には、Azure Blob Storage アカウントで 適切なロールの割り当てが 必要です。
Azure Data Lake Storage いいえ サービス プリンシパル サービス プリンシパルには、Azure Data Lake Storage ストレージ アカウントに 適切なロールの割り当てが 必要です。
Azure Data Lake Storage はい ユーザー ID パススルー ユーザー ID には、Azure Data Lake Storage ストレージ アカウントで 適切なロールの割り当てが 必要です。

* ユーザー ID パススルーは、 論理的な削除 が有効になっていない場合にのみ、Azure Blob Storage アカウントを指す資格情報のないデータストアに対して機能します。

既定のファイル共有のデータにアクセスする

Azure Machine Learning Studio の既定のワークスペース ファイル共有は、Notebooks の [ファイル] タブの下のディレクトリ ツリーです。 ノートブック コードは、他の構成なしでファイルの絶対パスを使用して、 file:// プロトコルを使用して、このファイル共有に格納されているファイルに直接アクセスできます。 既定のファイル共有は、サーバーレス Spark コンピューティング プールとアタッチされた Synapse Spark プールの両方にマウントされます。

ファイル共有の使用を示すスクリーンショット。

次のコード スニペットは、既定のファイル共有のユーザー名のすぐ下にあるデータ フォルダーに格納されている titanic.csv ファイルから データ にアクセスし、ラングリングします。 <USER> プレースホルダーをユーザー名に置き換えます。

import os
import pyspark.pandas as pd

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")