Fabricの高コンカレンシーセッションを開始するためのLivy APIの使い方

適用対象:✅ Fabric Data Engineering および Data Science

高コンカレンシー (HC) セッションを使用すると、複数の呼び出し元が互いに干渉することなく 1 つの Spark セッションを共有できます。 ワークロードごとに個別のセッションをプロビジョニングする代わりに、HC セッションを取得し、Fabric API によって、共有の基になるセッション内で分離された REPL が割り当てられます。

この記事では、Fabric Livy API を使用して、HC セッションの取得、セッションパッキングの検証、ステートメントの並列実行、REPL 分離の確認を行います。

前提条件

この記事の例に従うときは、プレースホルダー {Entra_TenantID}{Entra_ClientID}{Entra_ClientSecret}{Fabric_WorkspaceID}{Fabric_LakehouseID} を実際の値に置き換えます。

高並行性のセッションとは何ですか?

高コンカレンシー (HC) セッションでは、複数のユーザーまたはプロセスが 1 つの Spark セッションを共有できます。 各呼び出し元は、共有セッション内で分離された REPL (読み取り・評価・出力ループ) を利用できます。 異なる呼び出し元からのステートメントは相互に干渉しません。

セッションのパッキング

同じsessionTagを使用して2つのHCセッションを作成すると、Fabric APIによって、それらは同じ基礎となるLivyセッションにパックされます。 各 HC セッションは、次の機能を提供する独自の REPL を取得します。

  • リソース効率: 複数のユーザーが、それぞれ独自の Spark セッションを作成する代わりに、1 つの Spark セッションを共有します。
  • REPL 分離: 1 つの REPL 内の変数と状態は、他の REPL には表示されません。
  • 並列実行: さまざまな REPL のステートメントを同時に実行できます。

キー ID

ID 一意 用途
HC セッション id HC セッション ポーリング状態、セッションの削除
sessionId Livy セッション (圧縮時に共有 ) ステートメントの URL
replId REPL (分離コンテキスト) ステートメントの URL

Important

sessionIdreplIdは、HC セッションがIdle状態に達した後にのみ使用できます。

HC セッションと通常の Livy セッションの違い

特徴 通常の Livy セッション HC セッション
エンドポイント .../sessions .../highConcurrencySessions
ステートメント セッションに直接送信 REPL を通じて送信 (/repls/{replId}/statements)
取得 セッションが直接 idle になる NotStarted次にAcquiringHighConcurrencySessionIdle
セッションのパッキング 適用なし 基になる Spark セッションを共有するためのオプションのsessionTag

ステップバイステップのチュートリアル

1. Microsoft Entraで認証する

SPN クライアント資格情報フローを使用してアクセス トークンを取得します。 プレースホルダーの値を実際の資格情報に置き換えます。

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. 同じセッション タグを持つ 2 つの HC セッションを作成する

sessionTag: "demo-tag"を使用して 2 つの HC セッションを作成します。 これらのタグは同じタグを共有するため、Fabric API はそれらを同じ基になる Livy セッションにまとめます。 各セッションは、独自の分離された REPL を取得します。

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. 準備が整うまで両方のセッションをポーリングし、セッションパッキングを確認する

各セッションは、 NotStartedAcquiringHighConcurrencySessionIdleの状態で遷移します。

両方のセッションが Idleされると、出力はセッションパッキングに関する次の詳細を確認します。

  • 2 つの HC セッション ID (hc_id_ahc_id_b) が異なり、各 "取得" 呼び出しによって個別の HC セッションが返されたことを確認します。
  • 基になる Livy セッション ID (sessionId_asessionId_b) が一致し、両方の HC セッションが 同じ Livy セッションにパックされたことを確認します。
  • REPL ID (replId_areplId_b) は異なり、各 HC セッションに独自の分離された実行コンテキストがあることを確認します。

次のコードは、準備ができるまで両方のセッションをポーリングし、検証出力を出力します。

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. ステートメントを両方の REPL に並列で送信する

結果をポーリングする前に、2 つの POST 要求 (REPL ごとに 1 つ) を送信します。 REPL は同じ Spark セッションを共有するため、両方のステートメントを同時に実行できます。 このコードでは、残りの手順で使用する poll_statement ヘルパー関数も定義します。

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

5. REPL の分離を確認する

REPL A x = 42 変数を設定し、REPL B からアクセスしてみます。両方の REPL が同じ Spark セッションを共有している場合でも、それらの 変数は分離されます

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. 両方の HC セッションをクリーンアップする

両方の HC セッションを削除してリソースを解放します。 基になるidではなく、HC セッション sessionIdを使用します。

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

監視ハブで自分のジョブを表示する

  1. 左側のナビゲーションで [モニター ] に移動します。
  2. セッションの詳細を表示するには、最新のアクティビティ名を選択します。
  3. 両方の HC セッションが同じ基になる Spark セッションを共有していることに注意してください。これは、セッションのパッキングが行われていることを確認するものです。

API エンドポイント リファレンス

Operation メソッド エンドポイント
HC セッションを作成する POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
HC セッションを取得する GET .../highConcurrencySessions/{highConcurrencySessionId}
HC セッションを削除する DELETE .../highConcurrencySessions/{highConcurrencySessionId}
ステートメントを送信 POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
ステートメントを取得 GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
キャンセル ステートメント POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

作成、取得、および削除の各操作では、HC セッション idを使用します。 ステートメント操作では、基になる Livy sessionIdが使用されます。