適用対象:✅ Fabric Data Engineering および Data Science
Livy API for Fabric Data Engineering を使用して Spark セッション ジョブを送信する方法について説明します。
前提条件
Fabric PremiumまたはTrial capacityのLakehouse
Visual Studio Code、Jupyter Notebooks、PySpark、Python用Microsoft Authentication Library (MSAL) のようなリモートクライアント
Microsoft Entra アプリ トークンのいずれか。
Microsoft ID プラットフォーム または、Microsoft Entra SPN トークン。
Microsoft Entra レイクハウス内のデータ。この例では、NYC Taxi & Limousine Commission の green_tripdata_2022_08 (レイクハウスに読み込まれた parquet ファイル) を使用します
Livy API で、操作用に統合エンドポイントを定義します。 この記事の例に従うときは、プレースホルダー {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}、{Fabric_LakehouseID} を適切な値に置き換えます。
Livy API セッションのVisual Studio Codeを構成する
Fabric Lakehouse で Lakehouse Settings を選択します。
[Livy エンドポイント] セクションに移動します。
"セッションジョブの接続文字列(画像内の最初の赤いボックス)をコードにコピーします。"
Microsoft Entra 管理センター に移動し、アプリケーション (クライアント) ID とディレクトリ (テナント) ID の両方をコードにコピーします。
Microsoft Entra 管理センターにある Livy API アプリの概要を示すスクリーンショット
Microsoft Entra ユーザー トークンまたは Microsoft Entra SPN トークンを使用して Livy API Spark セッションを認証する
Microsoft Entra SPN トークンを使用して Livy API Spark セッションを認証する
Visual Studio Codeで
.ipynbノートブックを作成し、次のコードを挿入します。import sys from msal import ConfidentialClientApplication # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Service Principal Application ID # Certificate paths - Update these paths to your certificate files certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem" # Public certificate file private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem" # Private key file certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint # OAuth settings audience = "https://analysis.windows.net/powerbi/api/.default" authority = f"https://login.windows.net/{tenant_id}" def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None): """ Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow. This function uses certificate-based authentication which is more secure than client secrets. Args: client_id (str): The Service Principal's client ID audience (str): The audience for the token (resource scope) authority (str): The OAuth authority URL certificate_path (str): Path to the certificate file (.pem format) private_key_path (str): Path to the private key file (.pem format) certificate_thumbprint (str): Certificate thumbprint (optional but recommended) Returns: str: The access token for API authentication Raises: Exception: If token acquisition fails """ try: # Read the certificate from PEM file with open(certificate_path, "r", encoding="utf-8") as f: certificate_pem = f.read() # Read the private key from PEM file with open(private_key_path, "r", encoding="utf-8") as f: private_key_pem = f.read() # Create the confidential client application app = ConfidentialClientApplication( client_id=client_id, authority=authority, client_credential={ "private_key": private_key_pem, "thumbprint": certificate_thumbprint, "certificate": certificate_pem } ) # Acquire token using client credentials flow token_response = app.acquire_token_for_client(scopes=[audience]) if "access_token" in token_response: print("Successfully acquired access token") return token_response["access_token"] else: raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}") except FileNotFoundError as e: print(f"Certificate file not found: {e}") sys.exit(1) except Exception as e: print(f"Error retrieving token: {e}", file=sys.stderr) sys.exit(1) # Get the access token token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)ノートブック セルを実行します。 返されたMicrosoft Entra トークンが表示されます。
Microsoft Entra ユーザー トークンを使用して Livy API Spark セッションを認証する
Visual Studio Codeで
.ipynbノートブックを作成し、次のコードを挿入します。from msal import PublicClientApplication import requests import time # Configuration - Replace with your actual values tenant_id = "Entra_TenantID" # Microsoft Entra tenant ID client_id = "Entra_ClientID" # Application ID (can be the same as above or different) # Required scopes for Livy API access scopes = [ "https://api.fabric.microsoft.com/Lakehouse.Execute.All", # Required — execute operations in lakehouses "https://api.fabric.microsoft.com/Lakehouse.Read.All", # Required — read lakehouse metadata "https://api.fabric.microsoft.com/Code.AccessFabric.All", # Required — general Fabric API access from Spark Runtime "https://api.fabric.microsoft.com/Code.AccessStorage.All", # Required — access OneLake and Azure storage from Spark Runtime ] # Optional scopes — add these only if your Spark jobs need access to the corresponding services: # "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All" # Optional — access Azure Key Vault from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All" # Optional — access Azure Data Lake Storage Gen1 from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All" # Optional — access Azure Data Explorer from Spark Runtime # "https://api.fabric.microsoft.com/Code.AccessSQL.All" # Optional — access Azure SQL audience tokens from Spark Runtime def get_access_token(tenant_id, client_id, scopes): """ Get an access token using interactive authentication. This method will open a browser window for user authentication. Args: tenant_id (str): The Microsoft Entra tenant ID client_id (str): The application client ID scopes (list): List of required permission scopes Returns: str: The access token, or None if authentication fails """ app = PublicClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}" ) print("Opening browser for interactive authentication...") token_response = app.acquire_token_interactive(scopes=scopes) if "access_token" in token_response: print("Successfully authenticated") return token_response["access_token"] else: print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}") return None # Uncomment the lines below to use interactive authentication token = get_access_token(tenant_id, client_id, scopes) print("Access token acquired via interactive login")ノートブック セルを実行します。 返されたMicrosoft Entra トークンが表示されます。
Livy API の Code.* スコープについて
Livy API を介して Spark ジョブを実行すると、 Code.* スコープによって、認証されたユーザーに代わって Spark ランタイムがアクセスできる外部サービスが制御されます。 2 つ必要です。残りは、ワークロードに応じてオプションです。
必要なコード.* スコープ
| Scope | 説明 |
|---|---|
Code.AccessFabric.All |
Microsoft Fabricへのアクセス トークンの取得を許可します。 すべての Livy API 操作に必要です。 |
Code.AccessStorage.All |
OneLake と Azure ストレージへのアクセス トークンの取得を許可します。 レイクハウス内のデータの読み取りと書き込みに必要です。 |
オプションコード.* スコープ
これらのスコープは、Spark ジョブが実行時に対応するAzure サービスにアクセスする必要がある場合にのみ追加します。
| Scope | 説明 | いつ使用するか |
|---|---|---|
Code.AccessAzureKeyvault.All |
Azure Key Vaultへのアクセス トークンの取得を許可します。 | Spark コードは、Azure Key Vaultからシークレット、キー、または証明書を取得します。 |
Code.AccessAzureDataLake.All |
Azure Data Lake Storage Gen1へのアクセス トークンの取得を許可します。 | Spark コードは、Azure Data Lake Storage Gen1 アカウントから読み取りまたは書き込みを行います。 |
Code.AccessAzureDataExplorer.All |
Azure Data Explorer (Kusto) へのアクセス トークンの取得を許可します。 | Spark コードは、Azure Data Explorer クラスターに対してクエリを実行したり、データを取り込んだりします。 |
Code.AccessSQL.All |
Azure SQLへのアクセス トークンの取得を許可します。 | Spark コードは、Azure SQL データベースに接続する必要があります。 |
注
Lakehouse.Execute.AllスコープとLakehouse.Read.All スコープも必要ですが、Code.* ファミリには含まれません。 Fabric Lakehouse 内での操作の実行およびメタデータの読み取りのために、それぞれの権限を付与します。
Livy API Spark セッションを作成する
ヒント
ワークロードで複数の Spark ステートメントを同時に実行する必要がある場合は、代わりに コンカレンシーの高いセッション を使用することを検討してください。 HC セッションは、基になる Livy セッションの再利用をシステムが管理している間、並列で実行される独立した実行コンテキストを提供します。
別のノートブック セルを追加し、このコードを挿入します。
import json import requests api_base_url = "https://api.fabric.microsoft.com/" # Base URL for Fabric APIs # Fabric Resource IDs - Replace with your workspace and lakehouse IDs workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" # Construct the Livy API session URL # URL pattern: {base_url}/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyapi/versions/{api_version}/sessions livy_api_session_url = (f"{api_base_url}v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/" f"livyapi/versions/2023-12-01/sessions") # Set up authentication headers headers = {"Authorization": f"Bearer {token}"} print(f"Livy API URL: {livy_api_session_url}") print("Creating Livy session...") try: # Create a new Livy session with default configuration create_livy_session = requests.post(livy_api_session_url, headers=headers, json={}) # Check if the request was successful if create_livy_session.status_code == 202: session_info = create_livy_session.json() print('Livy session creation request submitted successfully') print(f'Session Info: {json.dumps(session_info, indent=2)}') # Extract session ID for future operations livy_session_id = session_info['id'] livy_session_url = f"{livy_api_session_url}/{livy_session_id}" print(f"Session ID: {livy_session_id}") print(f"Session URL: {livy_session_url}") else: print(f"Failed to create session. Status code: {create_livy_session.status_code}") print(f"Response: {create_livy_session.text}") except requests.exceptions.RequestException as e: print(f"Network error occurred: {e}") except json.JSONDecodeError as e: print(f"JSON decode error: {e}") print(f"Response text: {create_livy_session.text}") except Exception as e: print(f"Unexpected error: {e}")ノートブック セルを実行すると、Livy セッションの作成時に出力された 1 行が表示されます。
[監視ハブでジョブを表示する](#監視ハブでジョブを表示する) を使用して、Livy セッションが作成されたことを確認できます。
"Fabric"環境との統合
既定では、この Livy API セッションはワークスペースの既定のスターター プールに対して実行されます。 または、Fabric Environments Microsoft Fabric で環境を作成、構成、および使用して、Livy API セッションがこれらの Spark ジョブに使用する Spark プールをカスタマイズすることもできます。 Fabric環境を使用するには、前のノートブック セルをこの json ペイロードで更新します。
create_livy_session = requests.post(livy_base_url, headers = headers, json = {
"conf" : {
"spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID""}"}
}
)
Livy API Spark セッションを使用して spark.sql ステートメントを送信する
別のノートブック セルを追加し、このコードを挿入します。
# call get session API import time table_name = "green_tripdata_2022" print("Checking session status...") # Get current session status get_session_response = requests.get(livy_session_url, headers=headers) session_status = get_session_response.json() print(f"Current session state: {session_status['state']}") # Wait for session to become idle (ready to accept statements) print("Waiting for session to become idle...") while session_status["state"] != "idle": print(f" Session state: {session_status['state']} - waiting 5 seconds...") time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) session_status = get_session_response.json() print("Session is now idle and ready to accept statements") # Execute a Spark SQL statement execute_statement_url = f"{livy_session_url}/statements" # Define your Spark SQL query - Replace with your actual table and query payload_data = { "code": "spark.sql(\"SELECT * FROM {table_name} WHERE column_name = 'some_value' LIMIT 10\").show()", "kind": "spark" # Type of code (spark, pyspark, sql, etc.) } print("Submitting Spark SQL statement...") print(f"Query: {payload_data['code']}") try: # Submit the statement for execution execute_statement_response = requests.post(execute_statement_url, headers=headers, json=payload_data) if execute_statement_response.status_code == 200: statement_info = execute_statement_response.json() print('Statement submitted successfully') print(f"Statement Info: {json.dumps(statement_info, indent=2)}") # Get statement ID for monitoring statement_id = str(statement_info['id']) get_statement_url = f"{livy_session_url}/statements/{statement_id}" print(f"Statement ID: {statement_id}") # Monitor statement execution print("Monitoring statement execution...") get_statement_response = requests.get(get_statement_url, headers=headers) statement_status = get_statement_response.json() while statement_status["state"] != "available": print(f" Statement state: {statement_status['state']} - waiting 5 seconds...") time.sleep(5) get_statement_response = requests.get(get_statement_url, headers=headers) statement_status = get_statement_response.json() # Retrieve and display results print("Statement execution completed!") if 'output' in statement_status and 'data' in statement_status['output']: results = statement_status['output']['data']['text/plain'] print("Query Results:") print(results) else: print("No output data available") else: print(f"Failed to submit statement. Status code: {execute_statement_response.status_code}") print(f"Response: {execute_statement_response.text}") except Exception as e: print(f"Error executing statement: {e}")ノートブック セルを実行して、ジョブが送信され、結果が返されると、複数の増分行が出力されます。
Livy API Spark セッションを使用して 2 つ目の spark.sql ステートメントを送信する
別のノートブック セルを追加し、このコードを挿入します。
print("Executing additional Spark SQL statement...") # Wait for session to be idle again get_session_response = requests.get(livy_session_url, headers=headers) session_status = get_session_response.json() while session_status["state"] != "idle": print(f" Waiting for session to be idle... Current state: {session_status['state']}") time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) session_status = get_session_response.json() # Execute another statement - Replace with your actual query payload_data = { "code": f"spark.sql(\"SELECT COUNT(*) as total_records FROM {table_name}\").show()", "kind": "spark" } print(f"Executing query: {payload_data['code']}") try: # Submit the second statement execute_statement_response = requests.post(execute_statement_url, headers=headers, json=payload_data) if execute_statement_response.status_code == 200: statement_info = execute_statement_response.json() print('Second statement submitted successfully') statement_id = str(statement_info['id']) get_statement_url = f"{livy_session_url}/statements/{statement_id}" # Monitor execution print("Monitoring statement execution...") get_statement_response = requests.get(get_statement_url, headers=headers) statement_status = get_statement_response.json() while statement_status["state"] != "available": print(f" Statement state: {statement_status['state']} - waiting 5 seconds...") time.sleep(5) get_statement_response = requests.get(get_statement_url, headers=headers) statement_status = get_statement_response.json() # Display results print("Second statement execution completed!") if 'output' in statement_status and 'data' in statement_status['output']: results = statement_status['output']['data']['text/plain'] print("Query Results:") print(results) else: print("No output data available") else: print(f"Failed to submit second statement. Status code: {execute_statement_response.status_code}") except Exception as e: print(f"Error executing second statement: {e}")ノートブック セルを実行して、ジョブが送信され、結果が返されると、複数の増分行が出力されます。
Livy セッションを終了する
別のノートブック セルを追加し、このコードを挿入します。
print("Cleaning up Livy session...") try: # Check current session status before deletion get_session_response = requests.get(livy_session_url, headers=headers) if get_session_response.status_code == 200: session_info = get_session_response.json() print(f"Session state before deletion: {session_info.get('state', 'unknown')}") print(f"Deleting session at: {livy_session_url}") # Delete the session delete_response = requests.delete(livy_session_url, headers=headers) if delete_response.status_code == 200: print("Session deleted successfully") elif delete_response.status_code == 404: print("Session was already deleted or not found") else: print(f"Delete request completed with status code: {delete_response.status_code}") print(f"Response: {delete_response.text}") print(f"Delete response details: {delete_response}") except requests.exceptions.RequestException as e: print(f"Network error during session deletion: {e}") except Exception as e: print(f"Error during session cleanup: {e}")
監視ハブでジョブを表示してください
左側のナビゲーション リンクの [監視] を選択して監視ハブにアクセスすると、さまざまな Apache Spark アクティビティを表示できます。
セッションが進行中または完了状態の場合、[モニター] に移動してセッションの状態を表示できます。
最新のアクティビティ名を選択して開きます。
この Livy API セッションのケースでは、以前のセッション送信、実行の詳細、Spark のバージョン、構成を確認できます。 右上の停止状態に注目してください。
プロセス全体を要約するには、Visual Studio Code、Microsoft Entra アプリ/SPN トークン、Livy API エンドポイント URL、Lakehouse に対する認証、最後に Session Livy API などのリモート クライアントが必要です。