Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Gælder for:✅ Fabric Data Engineering og Data Science
Højsamtidige (HC) sessioner lader flere opkaldere dele en enkelt Spark-session uden at forstyrre hinanden. I stedet for at provisionere en separat session for hver arbejdsbelastning, får du en HC-session, og Fabric API'en tildeler den en isoleret REPL inden for en delt underliggende session.
I denne artikel bruger du Fabric Livy API'en til at hente HC-sessioner, verificere session packing, køre sætninger parallelt og bekræfte REPL-isolation.
Forudsætninger
- Fabric Premium eller prøvekapacitet med et Lakehouse.
- En fjernklient som Visual Studio Code med PySpark og Python 3.10+.
- En Microsoft Entra service principal (SPN) med adgang til arbejdsområdet. Registrer en ansøgning hos Microsoft identity platform.
- En klienthemmelighed for tjenestelederen. Tilføj og administrer ansøgningsoplysninger.
Erstat pladsholderne {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID}, og {Fabric_LakehouseID} med dine værdier, når du følger eksemplerne i denne artikel.
Hvad er sessioner med høj samtidighed?
Højsamtidighedssessioner (HC) tillader flere brugere eller processer at dele en enkelt Spark-session. Hver kalder får en isoleret REPL (Read-Eval-Print Loop) inden for den delte session. Udtalelser fra forskellige opkaldere forstyrrer ikke hinanden.
Sessionspakning
Når du opretter to HC-sessioner med samme sessionTag, pakker Fabric API'en dem på den samme underliggende Livy-session. Hver HC-session får sin egen REPL, som giver:
- Ressourceeffektivitet: Flere brugere deler én Spark-session i stedet for at hver opretter deres egen.
- REPL-isolation: Variabler og tilstand i én REPL er ikke synlige for andre.
- Parallel eksekvering: Sætninger på forskellige REPL'er kan køre samtidigt.
Nøgle-ID'er
| id | Unikt per | Brugt til |
|---|---|---|
HC-session id |
HC-session | Afstemningsstatus, slet session |
sessionId |
Livy-session (delt når den er pakket) | Sætnings-URL'er |
replId |
REPL (isoleret kontekst) | Sætnings-URL'er |
Vigtigt!
Og sessionId er replId kun tilgængelige, når HC-sessionen når Idle staten.
Hvordan HC-sessioner adskiller sig fra almindelige Livy-sessioner
| Aspekt | Almindelig Livius-session | HC-session |
|---|---|---|
| Slutpunkt | .../sessions |
.../highConcurrencySessions |
| Udtalelser | Indsendt direkte til sessionen | Indsendt via en REPL (/repls/{replId}/statements) |
| Erhvervelse | Sessionen bliver idle direkte |
NotStarted så AcquiringHighConcurrencySession så Idle |
| Sessionspakning | Ikke anvendelig | Det er valgfrit sessionTag at dele underliggende Spark-sessioner |
Trin-for-trin gennemgang
1. Autentificér med Microsoft Entra
Erhverv et adgangstoken ved hjælp af SPN-klient-legitimationsflowet. Erstat pladsholderværdierne med dine faktiske legitimationsoplysninger.
from msal import ConfidentialClientApplication
# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}" # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}" # Service principal application ID
client_secret = "{Entra_ClientSecret}" # Service principal client secret
# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=[scope])
if "access_token" in result:
token = result["access_token"]
print("Access token acquired successfully.")
else:
raise RuntimeError(
f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
)
2. Opret to HC-sessioner med det samme sessionstag
Opret to HC-sessioner ved hjælp af sessionTag: "demo-tag". Fordi de deler det samme tag, pakker Fabric API'en dem på den samme underliggende Livy session. Hver session får sin egen isolerede REPL.
import json
import requests
# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"
# Construct the HC session endpoint URL
livy_base_url = (
f"https://api.fabric.microsoft.com/v1"
f"/workspaces/{workspace_id}"
f"/lakehouses/{lakehouse_id}"
f"/livyapi/versions/2023-12-01"
f"/highConcurrencySessions"
)
headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"
print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()
# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f" HC session A id: {hc_id_a} state: {session_a['state']}")
# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f" HC session B id: {hc_id_b} state: {session_b['state']}")
session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"
3. Poll begge sessioner, indtil de er klar, og verificér session packing
Hver session gennemgår disse tilstande: NotStarted, AcquiringHighConcurrencySession, og derefter Idle.
Når begge sessioner er Idle, bekræfter outputtet følgende detaljer om sessionspakning:
- De to HC-sessions-ID'er (
hc_id_aoghc_id_b) er forskellige, hvilket bekræfter, at hvert "acquire"-kald returnerede en særskilt HC-session. - De underliggende Livy-sessions-ID'er (
sessionId_aogsessionId_b) matcher, hvilket bekræfter, at begge HC-sessioner blev pakket på samme Livy-session. - REPL-ID'erne (
replId_aogreplId_b) er forskellige, hvilket bekræfter, at hver HC-session har sin egen isolerede eksekveringskontekst.
Følgende kode spørger begge sessioner, indtil de er klar, og udskriver verifikationsoutputtet:
import time
ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5
def poll_until_ready(url, label):
"""Poll an HC session until it leaves the acquisition states."""
print(f"[{label}] Polling...")
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
print(f" [{label}] state={state} sessionId={data.get('sessionId', 'N/A')} replId={data.get('replId', 'N/A')}")
if state in ("Dead", "Killed", "Failed"):
raise RuntimeError(f"[{label}] Session failed: {state}")
if state not in ACQUIRING_STATES:
return data
time.sleep(POLL_INTERVAL)
ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")
livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]
print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id: {hc_id_a}")
print(f"HC session B id: {hc_id_b}")
print(f"HC IDs differ: {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A: {livy_session_id_a}")
print(f"Livy sessionId B: {livy_session_id_b}")
print(f"Same Livy session: {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A: {repl_id_a}")
print(f"REPL B: {repl_id_b}")
print(f"REPLs differ: {repl_id_a != repl_id_b}")
4. Indsend erklæringer til begge REPL'er parallelt
Indsend to POST-anmodninger (én pr. REPL) før du spørger om resultater. Fordi REPL'erne deler den samme Spark-session, kan begge udsagn køre samtidig. Denne kode definerer også hjælpefunktionen, poll_statement der bruges i de resterende trin.
# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"
# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"
print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"
print("Both statements submitted. Polling for results...")
# Poll both statements
def poll_statement(url, label):
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("state") not in ("waiting", "running"):
return data
time.sleep(5)
result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")
5. Verificér REPL-isolation
Sæt en variabel x = 42 i REPL A, og prøv derefter at tilgå den fra REPL B. Selvom begge REPL'er deler den samme Spark-session, er deres variable isolerede.
# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")
# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n print(x)\nexcept NameError as e:\n print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")
print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")
6. Ryd op i begge HC-sessioner
Slet begge HC-sessioner for at frigive ressourcer. Brug HC-sessionen id, ikke den underliggende sessionId.
for label, url in [("A", session_url_a), ("B", session_url_b)]:
print(f"[{label}] Deleting HC session...")
resp = requests.delete(url, headers=headers)
if resp.status_code in (200, 204):
print(f"[{label}] Deleted successfully.")
elif resp.status_code == 404:
print(f"[{label}] Already deleted.")
else:
print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")
Se dine job i overvågningshubben
- Navigér til Monitor i venstre side-navigation.
- Vælg det seneste aktivitetsnavn for at se sessionsdetaljer.
- Bemærk, at begge HC-sessioner deler den samme underliggende Spark-session, som bekræfter session packing.
API-endpointsreference
| Operation | Metode | Slutpunkt |
|---|---|---|
| Opret HC-session | POST |
/v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions |
| Få HC-session | GET |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Slet HC-sessionen | DELETE |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Indsend erklæring | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements |
| Få erklæring | GET |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId} |
| Annulleringsudsagn | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel |
Bemærkning
Opret, hent og slet-operationer bruger HC-sessionen id. Sætningsoperationer bruger den underliggende Livy sessionId.