Azure Synapse Studio チームは、Microsoft Spark Utilities (mssparkutils) パッケージに 2 つの新しいマウント/マウント解除 API を構築しました。 これらの API を使用して、すべての作業ノード (ドライバー ノードとワーカー ノード) にリモート ストレージ (Azure Blob Storage、Azure Data Lake Storage Gen2、または Azure ファイル共有) をアタッチできます。 ストレージを配置した後は、ローカル ファイル API を使って、ローカル ファイル システムに格納されているかのようにデータにアクセスできます。 詳細については、「Microsoft Spark ユーティリティの概要を参照してください。
この記事では、ワークスペースでマウントおよびマウント解除 API を使う方法について説明します。 学習内容:
- ファイル共有Data Lake Storage Gen2、Blob Storage、またはAzureをマウントする方法。
- ローカル ファイル システム API を使ってマウント ポイントの下にあるファイルにアクセスする方法。
-
mssparkutils fsAPI を使ってマウント ポイントの下にあるファイルにアクセスする方法。 - Spark 読み取り API を使ってマウント ポイントの下にあるファイルにアクセスする方法。
- マウント ポイントのマウントを解除する方法。
警告
Azure Data Lake Storage Gen1ストレージはサポートされていません。 マウント API を使用する前に、Azure Data Lake Storage Gen1 から Gen2 への移行ガイダンス に従って、Data Lake Storage Gen2に移行できます。
ストレージをマウントする
このセクションでは、例としてData Lake Storage Gen2ステップ バイ ステップでマウントする方法について説明します。 Blob StorageとAzureファイル共有のマウントも同様に機能します。
この例では、storegen2 という名前の 1 つのData Lake Storage Gen2 アカウントがあることを前提としています。 そのアカウントには mycontainer という名前のコンテナーが 1 つあり、それを Spark プールの /test にマウントします。
mycontainer という名前のコンテナーをマウントするには、mssparkutils で最初に、ユーザーがコンテナーにアクセスするためのアクセス許可を持っているかどうかを調べる必要があります。 現在、Azure Synapse Analyticsでは、トリガー マウント操作の 3 つの認証方法 (linkedService、accountKey、および sastoken) がサポートされています。
リンク サービスを使ってマウントする (推奨)
リンク サービスによるトリガー マウントをお勧めします。 この方法を使うと、mssparkutils でシークレットや認証値自体が格納されないため、セキュリティ リークが避けられます。 代わりに、mssparkutils は常にリンク サービスから認証値をフェッチして、リモート ストレージに BLOB データを要求します。
Data Lake Storage Gen2またはBlob Storageのリンクされたサービスを作成できます。 現在、Azure Synapse Analyticsでは、リンクされたサービスを作成するときに 2 つの認証方法がサポートされています。
アカウント キーを使ってリンク サービスを作成する
システム割り当てマネージド ID を使用してリンクされたサービスを作成する
重要
- 上記で作成したリンクされたサービスをAzure Data Lake Storage Gen2に管理されたプライベート エンドポイント (dfs URI) を使用する場合は、Azure Blob Storageを使用して別のセカンダリ マネージド プライベート エンドポイントを作成する必要があります。オプション (blob URI) を使用して、内部fsspec/adlfs コードが BlobServiceClient インターフェイスを使用して接続できるようにします。
- セカンダリ マネージド プライベート エンドポイントが正しく構成されていない場合、ServiceRequestError: Cannot connect to host [storageaccountname].blob.core.windows.net:443 ssl:True [Name or service not known] のようなエラー メッセージが表示されます
注意
認証方法としてマネージド ID を使ってリンク サービスを作成する場合は、ワークスペースの MSI ファイルに、マウントされるコンテナーのストレージ BLOB データ共同作成者ロールがあることを確認します。
リンクされたサービスを正常に作成したら、次のPythonコードを使用して、コンテナーを Spark プールに簡単にマウントできます。
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"linkedService": "mygen2account"}
)
注意
mssparkutils を使用できない場合、それをインポートすることが必要な場合があります。
from notebookutils import mssparkutils
使用する認証方法に関係なく、ルート フォルダーをマウントすることはお勧めしません。
マウント パラメーター:
- fileCacheTimeout: BLOB は、ローカルの一時フォルダーに既定で 120 秒間キャッシュされます。 この間、blobfuse では、ファイルが最新であるかどうかの確認は行われません。 必要に応じて、パラメーターの指定により既定のタイムアウト時間を変更できます。 複数のクライアントから同時にファイルの変更操作が行われる場合は、ローカル ファイルとリモート ファイルとの間に不整合が発生するのを避けるために、キャッシュ時間を短縮するか 0 に設定し、常にサーバーから最新のファイルが取得されるようにすることをお勧めします。
- timeout: マウント操作のタイムアウトは、デフォルトで 120 秒です。 必要に応じて、パラメーターの指定により既定のタイムアウト時間を変更できます。 Executor が多すぎる場合や、マウントのタイムアウトが発生する場合は、値を大きくすることをお勧めします。
- scope: scope パラメーターは、マウントのスコープを指定する場合に使います。 既定値は "job" です。スコープが "job" に設定されている場合、マウントは現在のクラスターにのみ表示されます。 スコープが "workspace" に設定されている場合、マウントは現在のワークスペース内にあるすべてのノートブックに表示されます。また、マウント ポイントは、なければ自動的に作成されます。 マウント ポイントをマウント解除するには、同じパラメーターをマウント解除 API に追加します。 ワークスペース レベルのマウントは、リンク サービス認証でのみサポートされます。
これらのパラメーターの使用例を示します。
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"linkedService":"mygen2account", "fileCacheTimeout": 120, "timeout": 120}
)
Shared Access Signature トークンまたはアカウント キーを使ってマウントする
リンク サービスによるマウントだけでなく、mssparkutils では、アカウント キーまたは Shared Access Signature (SAS) トークンをパラメーターとして明示的に渡してターゲットをマウントすることがサポートされています。
セキュリティ上の理由から、可能な場合はアカウント キーや SAS トークンの代わりにマネージド ID とMicrosoft Entra認証を使用することをお勧めします。 アカウント キーを使用する必要がある場合は、Azure Key Vaultに格納します (次のスクリーンショットの例を参照)。 その後、mssparkutil.credentials.getSecret API を使って取得できます。 詳細については、「
コード例を次に示します。
from notebookutils import mssparkutils
accountKey = mssparkutils.credentials.getSecret("MountKV","mySecret")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
注意
セキュリティ上の理由から、資格情報をコードに格納しないでください。
mssparkutils fs API を使ってマウント ポイントの下にあるファイルにアクセスする
マウント操作の主な目的は、ユーザーがローカル ファイル システムの API を使って、リモート ストレージ アカウントに格納されているデータにアクセスできるようすることです。 また、mssparkutils fs API を使用し、マウントされたパスをパラメーターで指定して、データにアクセスすることもできます。 ここで使用するパスの形式は少し異なります。
マウント API を使用して、Data Lake Storage Gen2 コンテナー mycontainer を /test にマウントしたとします。 ローカル ファイル システム API を使用してデータにアクセスする場合:
- 3.3 以下の Spark バージョンの場合、パス形式は
/synfs/{jobId}/test/{filename}。 - 3.4 以上の Spark バージョンの場合、パス形式は
/synfs/notebook/{jobId}/test/{filename}。
正確なパスを取得するには、mssparkutils.fs.getMountPath() を使うことをお勧めします。
path = mssparkutils.fs.getMountPath("/test")
注意
workspaceスコープでストレージをマウントすると、マウント ポイントが /synfs/workspace フォルダーの下に作成されます。 正確なパスを取得するには、 mssparkutils.fs.getMountPath("/test", "workspace") を使用する必要があります。
mssparkutils fs API を使用してデータにアクセスする場合、パス形式は次のようになります: synfs:/notebook/{jobId}/test/{filename}。 この場合の synfs は、マウントされたパスの一部ではなく、スキーマとして使われていることがわかります。 もちろん、ローカル ファイル システム スキーマを使用してデータにアクセスすることもできます。 たとえば、file:/synfs/notebook/{jobId}/test/{filename} のようにします。
次の 3 つの例では、mssparkutils fs を使ってマウント ポイント パスでファイルにアクセスする方法を示します。
ディレクトリのリストを取得します。
mssparkutils.fs.ls(f'file:{mssparkutils.fs.getMountPath("/test")}')ファイルの内容を読み取ります。
mssparkutils.fs.head(f'file:{mssparkutils.fs.getMountPath("/test")}/myFile.csv')ディレクトリを作成します。
mssparkutils.fs.mkdirs(f'file:{mssparkutils.fs.getMountPath("/test")}/myDir')
Spark 読み取り API を使ってマウント ポイントの下にあるファイルにアクセスする
Spark 読み取り API を使って、データにアクセスするためのパラメーターを指定できます。 ここでのパス形式は、 mssparkutils fs API を使用する場合と同じです。
マウントされたData Lake Storage Gen2 ストレージ アカウントからファイルを読み取る
次の例では、Data Lake Storage Gen2 ストレージ アカウントが既にマウントされていることを前提とし、マウント パスを使用してファイルを読み取ります。
%%pyspark
df = spark.read.load(f'file:{mssparkutils.fs.getMountPath("/test")}/myFile.csv', format='csv')
df.show()
注意
リンク サービスを使ってストレージをマウントする場合、データへのアクセスに synfs スキーマを使うには、必ず spark リンク サービス構成を明示的に設定する必要があります。 詳細については、リンク サービスの ADLS Gen2 ストレージに関するセクションを参照してください。
マウントされたBlob Storage アカウントからファイルを読み取る
Blob Storage アカウントをマウントし、mssparkutils または Spark API を使用してアクセスする場合は、マウント API を使用してコンテナーをマウントする前に、Spark 構成を使用して SAS トークンを明示的に構成する必要があります。
トリガーのマウント後に
mssparkutilsまたは Spark API を使用してBlob Storage アカウントにアクセスするには、次のコード例に示すように Spark 構成を更新します。 マウントの後でローカル ファイル API だけを使って Spark の構成にアクセスする場合は、このステップをバイパスできます。blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds("myblobstorageaccount") spark.conf.set('fs.azure.sas.mycontainer.<blobStorageAccountName>.blob.core.windows.net', blob_sas_token)リンクされたサービス
myblobstorageaccountを作成し、リンクされたサービスを使用してBlob Storage アカウントをマウントします。%%spark mssparkutils.fs.mount( "wasbs://mycontainer@<blobStorageAccountName>.blob.core.windows.net", "/test", Map("linkedService" -> "myblobstorageaccount") )Blob Storage コンテナーをマウントし、ローカル ファイル API を使用してマウント パスを使用してファイルを読み取ります。
# mount the Blob Storage container, and then read the file by using a mount path with open(mssparkutils.fs.getMountPath("/test") + "/myFile.txt") as f: print(f.read())Spark 読み取り API を使用して、マウントされたBlob Storage コンテナーからデータを読み取ります。
%%spark // mount blob storage container and then read file using mount path val df = spark.read.text(f'file:{mssparkutils.fs.getMountPath("/test")}/myFile.txt') df.show()
マウント パスのマウントを解除する
マウント ポイント (この例では /test) をマウント解除するには、次のコードを使います。
mssparkutils.fs.unmount("/test")
既知の制限事項
マウント解除メカニズムは自動ではありません。 アプリケーションの実行が完了したときに、マウント ポイントをマウント解除してディスク領域を解放するには、コードでマウント解除 API を明示的に呼び出す必要があります。 そうしないと、マウント ポイントは、アプリケーションの実行が完了した後もノードに存在します。
現時点では、Data Lake Storage Gen1 ストレージ アカウントのマウントはサポートされていません。
次のステップ
Azure Synapse Analytics - Synapse ワークスペースを監視する
- Microsoft Spark ユーティリティへの導入