Merk
Tilgang til denne siden krever autorisasjon. Du kan prøve å logge på eller endre kataloger.
Tilgang til denne siden krever autorisasjon. Du kan prøve å endre kataloger.
Gjelder for:✅ Fabric Data Engineering og Data Science
Økter med høy samtidighet (HC) lar flere innringere dele en enkelt Spark-økt uten å forstyrre hverandre. I stedet for å opprette en separat økt for hver arbeidsbelastning, skaffer du deg en HC-sesjon, og Fabric API-et tildeler den en isolert REPL innenfor en delt underliggende sesjon.
I denne artikkelen bruker du Fabric Livy API for å hente HC-økter, verifisere session packing, kjøre setninger parallelt og bekrefte REPL-isolasjon.
Forutsetninger
- Fabric Premium- eller prøvekapasitet med et Lakehouse.
- En ekstern klient som Visual Studio Code med PySpark og Python 3.10+.
- En Microsoft Entra tjenesteprinsipp (SPN) med arbeidsområdetilgang. Registrer en søknad hos Microsoft identity platform.
- En klienthemmelighet for tjenestelederen. Legg til og administrer applikasjonslegitimasjon.
Bytt ut plassholderne {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID}, og {Fabric_LakehouseID} med verdiene dine når du følger eksemplene i denne artikkelen.
Hva er økter med høy samtidighet?
Høy samtidighet (HC)-økter lar flere brukere eller prosesser dele en enkelt Spark-økt. Hver innringer får en isolert REPL (Read-Eval-Print Loop) innenfor den delte økten. Uttalelser fra forskjellige innringere forstyrrer ikke hverandre.
Samlingspakking
Når du lager to HC-økter med samme sessionTag, pakker Fabric API-et dem på samme underliggende Livy-økt. Hver HC-økt får sin egen REPL, som gir:
- Ressurseffektivitet: Flere brukere deler én Spark-økt i stedet for at hver lager sin egen.
- REPL-isolasjon: Variabler og tilstand i én REPL er ikke synlige for andre.
- Parallell utførelse: Setninger på forskjellige REPL-er kan kjøres samtidig.
Nøkkel-ID-er
| ID | Unik per | Brukt for |
|---|---|---|
HC-økt id |
HC-økt | Poll-status, slett økt |
sessionId |
Livy-sesjon (delt når den er pakket) | Setnings-URL-er |
replId |
REPL (isolert kontekst) | Setnings-URL-er |
Viktig!
Og sessionIdreplId er kun tilgjengelige når HC-sesjonen når Idle staten.
Hvordan HC-økter skiller seg fra vanlige Livy-økter
| Aspekt | Vanlig Livius-sesjon | HC-økt |
|---|---|---|
| Endepunkt | .../sessions |
.../highConcurrencySessions |
| Uttalelser | Innsendt direkte til sesjonen | Innsendt via en REPL (/repls/{replId}/statements) |
| Oppkjøp | Sesjonen blir idle direkte |
NotStarted da AcquiringHighConcurrencySession da Idle |
| Samlingspakking | Gjelder ikke | Valgfritt sessionTag å dele underliggende Spark-økter |
Steg-for-steg-gjennomgang
1. Autentiser deg med Microsoft Entra
Skaff en tilgangstoken ved å bruke SPN-klient-legitimasjonsflyten. Erstatt plassholderverdiene med dine faktiske kvalifikasjoner.
from msal import ConfidentialClientApplication
# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}" # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}" # Service principal application ID
client_secret = "{Entra_ClientSecret}" # Service principal client secret
# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=[scope])
if "access_token" in result:
token = result["access_token"]
print("Access token acquired successfully.")
else:
raise RuntimeError(
f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
)
2. Opprett to HC-økter med samme øktstagg
Lag to HC-økter ved å bruke sessionTag: "demo-tag". Fordi de deler samme tag, pakker Fabric API-et dem inn på samme underliggende Livy-økt. Hver økt får sin egen isolerte REPL.
import json
import requests
# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"
# Construct the HC session endpoint URL
livy_base_url = (
f"https://api.fabric.microsoft.com/v1"
f"/workspaces/{workspace_id}"
f"/lakehouses/{lakehouse_id}"
f"/livyapi/versions/2023-12-01"
f"/highConcurrencySessions"
)
headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"
print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()
# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f" HC session A id: {hc_id_a} state: {session_a['state']}")
# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f" HC session B id: {hc_id_b} state: {session_b['state']}")
session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"
3. Poll begge øktene til de er klare og verifiser session packing
Hver økt går gjennom disse tilstandene: NotStarted, AcquiringHighConcurrencySession, og deretter Idle.
Når begge øktene er Idle, bekrefter utdataene følgende detaljer om sesjonspakking:
- De to HC-sesjons-IDene (
hc_id_aoghc_id_b) er forskjellige, noe som bekrefter at hvert "acquire"-anrop returnerte en distinkt HC-økt. - De underliggende Livy-øktene (
sessionId_aogsessionId_b) stemmer overens, noe som bekrefter at begge HC-øktene var pakket inn på samme Livy-økt. - REPL-ID-ene (
replId_aogreplId_b) er forskjellige, noe som bekrefter at hver HC-økt har sin egen isolerte utførelseskontekst.
Følgende kode sjekker begge øktene til de er klare og skriver ut verifiseringsresultatene:
import time
ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5
def poll_until_ready(url, label):
"""Poll an HC session until it leaves the acquisition states."""
print(f"[{label}] Polling...")
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
print(f" [{label}] state={state} sessionId={data.get('sessionId', 'N/A')} replId={data.get('replId', 'N/A')}")
if state in ("Dead", "Killed", "Failed"):
raise RuntimeError(f"[{label}] Session failed: {state}")
if state not in ACQUIRING_STATES:
return data
time.sleep(POLL_INTERVAL)
ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")
livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]
print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id: {hc_id_a}")
print(f"HC session B id: {hc_id_b}")
print(f"HC IDs differ: {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A: {livy_session_id_a}")
print(f"Livy sessionId B: {livy_session_id_b}")
print(f"Same Livy session: {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A: {repl_id_a}")
print(f"REPL B: {repl_id_b}")
print(f"REPLs differ: {repl_id_a != repl_id_b}")
4. Send inn uttalelser til begge REPL-ene parallelt
Send inn to POST-forespørsler (én per REPL) før du sjekker resultatene. Fordi REPL-ene deler samme Spark-økt, kan begge setningene kjøres samtidig. Denne koden definerer poll_statement også hjelpefunksjonen som brukes i de resterende stegene.
# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"
# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"
print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"
print("Both statements submitted. Polling for results...")
# Poll both statements
def poll_statement(url, label):
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("state") not in ("waiting", "running"):
return data
time.sleep(5)
result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")
5. Verifiser REPL-isolasjon
Sett en variabel x = 42 i REPL A, og prøv deretter å få tilgang til den fra REPL B. Selv om begge REPL-ene deler samme Spark-økt, er variablene deres isolerte.
# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")
# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n print(x)\nexcept NameError as e:\n print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")
print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")
6. Rydd opp i begge HC-øktene
Slett begge HC-øktene for å frigjøre ressurser. Bruk HC-økten id, ikke den underliggende sessionId.
for label, url in [("A", session_url_a), ("B", session_url_b)]:
print(f"[{label}] Deleting HC session...")
resp = requests.delete(url, headers=headers)
if resp.status_code in (200, 204):
print(f"[{label}] Deleted successfully.")
elif resp.status_code == 404:
print(f"[{label}] Already deleted.")
else:
print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")
Se jobbene dine i overvåkingshuben
- Naviger til Monitor i navigasjonen på venstre side.
- Velg det nyeste aktivitetsnavnet for å se sesjonsdetaljer.
- Legg merke til at begge HC-øktene deler den samme underliggende Spark-økten, som bekrefter session packing.
API-endepunktsreferanse
| Operasjon | Metode | Endepunkt |
|---|---|---|
| Opprett HC-økt | POST |
/v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions |
| Få en HC-økt | GET |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Slett HC-økten | DELETE |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Send inn uttalelse | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements |
| Få uttalelse | GET |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId} |
| Avbryt-setningen | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel |
Bemerkning
Opprett, hent og slett-operasjoner bruker HC-sesjonen id. Setningsoperasjoner bruker den underliggende Livy sessionId.