次の方法で共有


Fabric の NotebookUtils ファイルのマウントとマウント解除

NotebookUtils では、Microsoft Spark Utilities パッケージを使用したファイルのマウント操作とマウント解除操作がサポートされています。 mountunmountgetMountPath()mounts() API を使用して、リモート ストレージ (ADLS Gen2、Azure Blob Storage、OneLake) をすべての作業ノード (ドライバー ノードとワーカー ノード) に接続できます。 ストレージ マウント ポイントを確立した後は、データがローカル ファイル システムに格納されているかのように、ローカル ファイル API を使用してアクセスできます。

マウント操作は、次の場合に特に便利です。

  • ローカル ファイル パスを必要とするライブラリを操作します。
  • クラウド ストレージ全体で一貫したファイル システム セマンティクスが必要です。
  • OneLake ショートカット (S3/GCS) に効率的にアクセスします。
  • 複数のストレージ バックエンドで動作する移植可能なコードを構築します。

API リファレンス

次の表は、使用可能なマウント API をまとめたものです。

メソッド 署名 説明
mount mount(source: String, mountPoint: String, extraConfigs: Map[String, Any] = None): Boolean 指定したマウント ポイントにリモート ストレージをマウントします。
unmount unmount(mountPoint: String, extraConfigs: Map[String, Any] = None): Boolean マウントポイントをアンマウントして削除します。
mounts mounts(extraOptions: Map[String, Any] = None): Array[MountPointInfo] 既存のすべてのマウント ポイントを詳細と共に一覧表示します。
getMountPath getMountPath(mountPoint: String, scope: String = ""): String マウント ポイントのローカル ファイル システム パスを取得します。

認証方法

マウント操作では、いくつかの認証方法がサポートされています。 ストレージの種類とセキュリティの要件に基づいて方法を選択します。

Microsoft Entra トークン認証では、ノートブック実行プログラムの ID (ユーザーまたはサービス プリンシパル) が使用されます。 マウント呼び出しでは明示的な資格情報は必要ないので、最も安全なオプションになります。 このオプションは、Lakehouse をマウントすることと、Fabric ワークスペースのストレージに使用します。

# Mount using Microsoft Entra token (no credentials needed)
notebookutils.fs.mount(
    "abfss://mycontainer@mystorageaccount.dfs.core.windows.net",
    "/mydata"
)

ヒント

可能な限り Microsoft Entra トークン認証を使用します。 これにより、資格情報の公開リスクが排除され、Fabric ワークスペース ストレージの追加のセットアップは必要ありません。

アカウント キー

ストレージ アカウントが Microsoft Entra 認証をサポートしていない場合、または外部またはサード パーティのストレージにアクセスする場合は、アカウント キーを使用します。 アカウント キーを Azure Key Vault に格納し、 notebookutils.credentials.getSecret API を使用して取得します。

# Retrieve account key from Azure Key Vault
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",
    "/test",
    {"accountKey": accountKey}
)

Shared Access Signature (SAS) トークン

アクセス許可スコープの時間制限付きアクセスには、 Shared Access Signature (SAS) トークンを使用します。 このオプションは、外部関係者に一時的なアクセス権を付与する必要がある場合に便利です。 SAS トークンを Azure Key Vault に格納します。

# Retrieve SAS token from Azure Key Vault
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",
    "/test",
    {"sasToken": sasToken}
)

Important

セキュリティ上の理由から、コードに資格情報を直接埋め込むのは避けてください。 ノートブックの出力に表示されるすべてのシークレットは自動的に編集されます。 詳細については、「秘密の編集」を参照してください。

ADLS Gen2 アカウントをマウントする

次の例では、Azure Data Lake Storage Gen2 をマウントする方法を示します。 Blob Storage と Azure ファイル共有のマウントも同様に機能します。

この例では、storegen2 という名前の Data Lake Storage Gen2 アカウントが 1 つあり、ノートブックの Spark セッションで /test にマウントする mycontainer という名前のコンテナーがあることを前提としています。

コンテナーのマウント場所を指定する画面を示すスクリーンショット。

mycontainer という名前のコンテナーをマウントするには、NotebookUtils が最初にコンテナーにアクセスするアクセス許可があるかどうかを確認する必要があります。 現在、Fabric では、トリガーマウント操作に対して 、Microsoft Entra トークン (既定)、 accountKeysasToken の 3 つの認証方法がサポートされています。

セキュリティ上の理由から、アカウント キーまたは SAS トークンを Azure Key Vault に格納します (次のスクリーンショットを参照)。 その後、notebookutils.credentials.getSecret API を使って取得できます。 Azure Key Vault の詳細については、「Azure Key Vault のマネージド ストレージ アカウント キーについて」をご覧ください。

Azure Key Vault にシークレットが格納されている場所を示すスクリーンショット。

accountKey メソッドのサンプル コード:

# get access token for keyvault resource
# You can also use the full audience, such as https://vault.azure.net.
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

sasToken のサンプル コード:

# get access token for keyvault resource
# You can also use the full audience, such as https://vault.azure.net.
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

マウント パラメーター

extraConfigs マップでは、次の省略可能なパラメーターを使用してマウント動作を調整できます。

  • fileCacheTimeout: BLOB は、既定で 120 秒間ローカル一時フォルダーにキャッシュされます。 この間、blobfuse はファイルが最新かどうかを確認しません。 このパラメーターを設定して、既定のタイムアウト時間を変更できます。 複数のクライアントが同時にファイルを変更する場合、ローカル ファイルとリモート ファイルの間の不整合を回避するには、キャッシュ時間を短縮するか、サーバーから常に最新のファイルを取得するように 0 に設定します。
  • timeout: マウント操作のタイムアウトは、既定では 30 秒です。 このパラメーターを設定して、既定のタイムアウト時間を変更できます。 Executor が多すぎる場合、またはマウントがタイムアウトになった場合は、値を大きくします。

これらのパラメーターの使用例を示します。

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 30}
)

キャッシュ構成に関する推奨事項

アクセス パターンに基づいてキャッシュ タイムアウト値を選択します。

シナリオ 推奨 fileCacheTimeout メモ
読み取り負荷の高い単一クライアント 120 (既定値) パフォーマンスと鮮度のバランスが良い。
マルチクライアント アクセスをモデレートする 3060 古いデータのリスクを軽減します。
ファイルを変更する複数のクライアント 0 常にサーバーから最新のものを取得します。
ファイルはほとんど変更されない 300+ 読み取りパフォーマンスを最適化します。

ゼロ キャッシュ パターン

複数のクライアントが同時にファイルを変更する場合は、ゼロキャッシュ構成を使用して、常に最新バージョンをサーバーからフェッチします。

# For scenarios with multiple clients modifying files
# Use zero cache to always fetch the latest from the server
notebookutils.fs.mount(
    "abfss://shared@account.dfs.core.windows.net",
    "/shared_data",
    {"fileCacheTimeout": 0}
)

多数の Executor でマウントする場合、またはタイムアウト エラーが発生した場合は、 timeout パラメーターを増やします。

レイクハウスをマウントする

Lakehouse のマウントでは、Microsoft Entra トークン認証のみがサポートされます。 /<mount_name> に Lakehouse をマウントするためのサンプル コード:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

notebookutils fs API を使用してマウント ポイントの下のファイルにアクセスする

ローカル ファイル システム API を使用してリモート ストレージ内のデータにアクセスする場合は、マウント操作を使用します。 マウントされたパスで notebookutils.fs API を使用してマウントされたデータにアクセスすることもできますが、パスの形式は異なります。

マウント API を使用して、Data Lake Storage Gen2 コンテナー mycontainer/test にマウントしたとします。 ローカル ファイル システム API を使ってそのデータにアクセスするときのパスの形式は、次のようになります。

/synfs/notebook/{sessionId}/test/{filename}

notebookutils fs API を使用してデータにアクセスする場合は、getMountPath()を使用して正確なパスを取得します。

path = notebookutils.fs.getMountPath("/test")
  • ディレクトリを一覧表示します。

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • ファイルの内容を読み取る。

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • ディレクトリを作成します。

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

ローカル パスを使用してマウント ポイント内のファイルにアクセスする

標準ファイル システムを使用して、マウント ポイント内のファイルの読み取りと書き込みを行うことができます。 次の Python の例は、このパターンを示しています。

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

既存のマウント ポイントを確認する

notebookutils.fs.mounts() API を使用して、既存のすべてのマウント ポイント情報を確認します。

notebookutils.fs.mounts()

ヒント

競合を回避するために、新しいマウント ポイントを作成する前に、常に mounts() を使用して既存のマウントを確認してください。

マウント前にマウントが存在するかどうかを確認する

existing_mounts = notebookutils.fs.mounts()
mount_point = "/mydata"

if any(m.mountPoint == mount_point for m in existing_mounts):
    print(f"Mount point {mount_point} already exists")
else:
    notebookutils.fs.mount(
        "abfss://container@account.dfs.core.windows.net",
        mount_point
    )
    print("Mount created successfully")

マウントポイントをアンマウントする

マウント ポイントのマウントを解除するには、次のコードを使用します (この例では/test )。

notebookutils.fs.unmount("/test")

Important

マウント解除メカニズムは自動的には適用されません。 アプリケーションの実行が完了したときに、マウント ポイントをマウント解除してディスク領域を解放するには、コードでマウント解除 API を明示的に呼び出す必要があります。 それ以外の場合、マウント ポイントは、アプリケーションの実行が完了した後もノードに存在します。

マウント-プロセス-アンマウントのワークフロー

信頼性の高いリソース管理のために、 try/finally ブロックでマウント操作をラップして、エラーが発生した場合でもクリーンアップが行われるようにします。

def process_with_mount(source_uri, mount_point):
    """Complete workflow: mount, process, unmount."""
    
    try:
        # Step 1: Check if already mounted
        existing = notebookutils.fs.mounts()
        if any(m.mountPoint == mount_point for m in existing):
            print(f"Already mounted at {mount_point}")
        else:
            notebookutils.fs.mount(source_uri, mount_point)
            print(f"Mounted {source_uri} at {mount_point}")
        
        # Step 2: Process data using local file system
        mount_path = notebookutils.fs.getMountPath(mount_point)
        
        with open(f"{mount_path}/data/input.txt", "r") as f:
            data = f.read()
        
        processed = data.upper()
        
        with open(f"{mount_path}/output/result.txt", "w") as f:
            f.write(processed)
        
        print("Processing complete")
        
    finally:
        # Step 3: Always unmount to release resources
        notebookutils.fs.unmount(mount_point)
        print(f"Unmounted {mount_point}")

process_with_mount(
    "abfss://mycontainer@mystorage.dfs.core.windows.net",
    "/temp_mount"
)

既知の制限

  • ** マウントはジョブレベルの設定である。 mounts API を使用して、マウント ポイントが既に存在するか、使用可能であるかを確認します。
  • マウント解除は自動的には行われません。 アプリケーションの実行が完了したら、コードでアンマウント API を呼び出してディスク領域を解放します。 それ以外の場合、マウント ポイントは、アプリケーションの実行が完了した後もノード上に残ります。
  • ADLS Gen1 ストレージ アカウントのマウントはサポートされていません。