Kom i gang med Livy API'en til Fabric high concurrency-sessioner

Gælder for:✅ Fabric Data Engineering og Data Science

Højsamtidige (HC) sessioner lader flere opkaldere dele en enkelt Spark-session uden at forstyrre hinanden. I stedet for at provisionere en separat session for hver arbejdsbelastning, får du en HC-session, og Fabric API'en tildeler den en isoleret REPL inden for en delt underliggende session.

I denne artikel bruger du Fabric Livy API'en til at hente HC-sessioner, verificere session packing, køre sætninger parallelt og bekræfte REPL-isolation.

Forudsætninger

Erstat pladsholderne {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID}, og {Fabric_LakehouseID} med dine værdier, når du følger eksemplerne i denne artikel.

Hvad er sessioner med høj samtidighed?

Højsamtidighedssessioner (HC) tillader flere brugere eller processer at dele en enkelt Spark-session. Hver kalder får en isoleret REPL (Read-Eval-Print Loop) inden for den delte session. Udtalelser fra forskellige opkaldere forstyrrer ikke hinanden.

Sessionspakning

Når du opretter to HC-sessioner med samme sessionTag, pakker Fabric API'en dem på den samme underliggende Livy-session. Hver HC-session får sin egen REPL, som giver:

  • Ressourceeffektivitet: Flere brugere deler én Spark-session i stedet for at hver opretter deres egen.
  • REPL-isolation: Variabler og tilstand i én REPL er ikke synlige for andre.
  • Parallel eksekvering: Sætninger på forskellige REPL'er kan køre samtidigt.

Nøgle-ID'er

id Unikt per Brugt til
HC-session id HC-session Afstemningsstatus, slet session
sessionId Livy-session (delt når den er pakket) Sætnings-URL'er
replId REPL (isoleret kontekst) Sætnings-URL'er

Vigtigt!

Og sessionId er replId kun tilgængelige, når HC-sessionen når Idle staten.

Hvordan HC-sessioner adskiller sig fra almindelige Livy-sessioner

Aspekt Almindelig Livius-session HC-session
Slutpunkt .../sessions .../highConcurrencySessions
Udtalelser Indsendt direkte til sessionen Indsendt via en REPL (/repls/{replId}/statements)
Erhvervelse Sessionen bliver idle direkte NotStartedAcquiringHighConcurrencySessionIdle
Sessionspakning Ikke anvendelig Det er valgfrit sessionTag at dele underliggende Spark-sessioner

Trin-for-trin gennemgang

1. Autentificér med Microsoft Entra

Erhverv et adgangstoken ved hjælp af SPN-klient-legitimationsflowet. Erstat pladsholderværdierne med dine faktiske legitimationsoplysninger.

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. Opret to HC-sessioner med det samme sessionstag

Opret to HC-sessioner ved hjælp af sessionTag: "demo-tag". Fordi de deler det samme tag, pakker Fabric API'en dem på den samme underliggende Livy session. Hver session får sin egen isolerede REPL.

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. Poll begge sessioner, indtil de er klar, og verificér session packing

Hver session gennemgår disse tilstande: NotStarted, AcquiringHighConcurrencySession, og derefter Idle.

Når begge sessioner er Idle, bekræfter outputtet følgende detaljer om sessionspakning:

  • De to HC-sessions-ID'er (hc_id_a og hc_id_b) er forskellige, hvilket bekræfter, at hvert "acquire"-kald returnerede en særskilt HC-session.
  • De underliggende Livy-sessions-ID'er (sessionId_a og sessionId_b) matcher, hvilket bekræfter, at begge HC-sessioner blev pakket på samme Livy-session.
  • REPL-ID'erne (replId_a og replId_b) er forskellige, hvilket bekræfter, at hver HC-session har sin egen isolerede eksekveringskontekst.

Følgende kode spørger begge sessioner, indtil de er klar, og udskriver verifikationsoutputtet:

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. Indsend erklæringer til begge REPL'er parallelt

Indsend to POST-anmodninger (én pr. REPL) før du spørger om resultater. Fordi REPL'erne deler den samme Spark-session, kan begge udsagn køre samtidig. Denne kode definerer også hjælpefunktionen, poll_statement der bruges i de resterende trin.

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

5. Verificér REPL-isolation

Sæt en variabel x = 42 i REPL A, og prøv derefter at tilgå den fra REPL B. Selvom begge REPL'er deler den samme Spark-session, er deres variable isolerede.

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. Ryd op i begge HC-sessioner

Slet begge HC-sessioner for at frigive ressourcer. Brug HC-sessionen id, ikke den underliggende sessionId.

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

Se dine job i overvågningshubben

  1. Navigér til Monitor i venstre side-navigation.
  2. Vælg det seneste aktivitetsnavn for at se sessionsdetaljer.
  3. Bemærk, at begge HC-sessioner deler den samme underliggende Spark-session, som bekræfter session packing.

API-endpointsreference

Operation Metode Slutpunkt
Opret HC-session POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
Få HC-session GET .../highConcurrencySessions/{highConcurrencySessionId}
Slet HC-sessionen DELETE .../highConcurrencySessions/{highConcurrencySessionId}
Indsend erklæring POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
Få erklæring GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
Annulleringsudsagn POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

Bemærkning

Opret, hent og slet-operationer bruger HC-sessionen id. Sætningsoperationer bruger den underliggende Livy sessionId.