Kom i gang med Livy API for Fabric high concurrency-økter

Gjelder for:✅ Fabric Data Engineering og Data Science

Økter med høy samtidighet (HC) lar flere innringere dele en enkelt Spark-økt uten å forstyrre hverandre. I stedet for å opprette en separat økt for hver arbeidsbelastning, skaffer du deg en HC-sesjon, og Fabric API-et tildeler den en isolert REPL innenfor en delt underliggende sesjon.

I denne artikkelen bruker du Fabric Livy API for å hente HC-økter, verifisere session packing, kjøre setninger parallelt og bekrefte REPL-isolasjon.

Forutsetninger

Bytt ut plassholderne {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID}, og {Fabric_LakehouseID} med verdiene dine når du følger eksemplene i denne artikkelen.

Hva er økter med høy samtidighet?

Høy samtidighet (HC)-økter lar flere brukere eller prosesser dele en enkelt Spark-økt. Hver innringer får en isolert REPL (Read-Eval-Print Loop) innenfor den delte økten. Uttalelser fra forskjellige innringere forstyrrer ikke hverandre.

Samlingspakking

Når du lager to HC-økter med samme sessionTag, pakker Fabric API-et dem på samme underliggende Livy-økt. Hver HC-økt får sin egen REPL, som gir:

  • Ressurseffektivitet: Flere brukere deler én Spark-økt i stedet for at hver lager sin egen.
  • REPL-isolasjon: Variabler og tilstand i én REPL er ikke synlige for andre.
  • Parallell utførelse: Setninger på forskjellige REPL-er kan kjøres samtidig.

Nøkkel-ID-er

ID Unik per Brukt for
HC-økt id HC-økt Poll-status, slett økt
sessionId Livy-sesjon (delt når den er pakket) Setnings-URL-er
replId REPL (isolert kontekst) Setnings-URL-er

Viktig!

Og sessionIdreplId er kun tilgjengelige når HC-sesjonen når Idle staten.

Hvordan HC-økter skiller seg fra vanlige Livy-økter

Aspekt Vanlig Livius-sesjon HC-økt
Endepunkt .../sessions .../highConcurrencySessions
Uttalelser Innsendt direkte til sesjonen Innsendt via en REPL (/repls/{replId}/statements)
Oppkjøp Sesjonen blir idle direkte NotStarted da AcquiringHighConcurrencySession da Idle
Samlingspakking Gjelder ikke Valgfritt sessionTag å dele underliggende Spark-økter

Steg-for-steg-gjennomgang

1. Autentiser deg med Microsoft Entra

Skaff en tilgangstoken ved å bruke SPN-klient-legitimasjonsflyten. Erstatt plassholderverdiene med dine faktiske kvalifikasjoner.

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. Opprett to HC-økter med samme øktstagg

Lag to HC-økter ved å bruke sessionTag: "demo-tag". Fordi de deler samme tag, pakker Fabric API-et dem inn på samme underliggende Livy-økt. Hver økt får sin egen isolerte REPL.

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. Poll begge øktene til de er klare og verifiser session packing

Hver økt går gjennom disse tilstandene: NotStarted, AcquiringHighConcurrencySession, og deretter Idle.

Når begge øktene er Idle, bekrefter utdataene følgende detaljer om sesjonspakking:

  • De to HC-sesjons-IDene (hc_id_a og hc_id_b) er forskjellige, noe som bekrefter at hvert "acquire"-anrop returnerte en distinkt HC-økt.
  • De underliggende Livy-øktene (sessionId_a og sessionId_b) stemmer overens, noe som bekrefter at begge HC-øktene var pakket inn på samme Livy-økt.
  • REPL-ID-ene (replId_a og replId_b) er forskjellige, noe som bekrefter at hver HC-økt har sin egen isolerte utførelseskontekst.

Følgende kode sjekker begge øktene til de er klare og skriver ut verifiseringsresultatene:

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. Send inn uttalelser til begge REPL-ene parallelt

Send inn to POST-forespørsler (én per REPL) før du sjekker resultatene. Fordi REPL-ene deler samme Spark-økt, kan begge setningene kjøres samtidig. Denne koden definerer poll_statement også hjelpefunksjonen som brukes i de resterende stegene.

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

5. Verifiser REPL-isolasjon

Sett en variabel x = 42 i REPL A, og prøv deretter å få tilgang til den fra REPL B. Selv om begge REPL-ene deler samme Spark-økt, er variablene deres isolerte.

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. Rydd opp i begge HC-øktene

Slett begge HC-øktene for å frigjøre ressurser. Bruk HC-økten id, ikke den underliggende sessionId.

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

Se jobbene dine i overvåkingshuben

  1. Naviger til Monitor i navigasjonen på venstre side.
  2. Velg det nyeste aktivitetsnavnet for å se sesjonsdetaljer.
  3. Legg merke til at begge HC-øktene deler den samme underliggende Spark-økten, som bekrefter session packing.

API-endepunktsreferanse

Operasjon Metode Endepunkt
Opprett HC-økt POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
Få en HC-økt GET .../highConcurrencySessions/{highConcurrencySessionId}
Slett HC-økten DELETE .../highConcurrencySessions/{highConcurrencySessionId}
Send inn uttalelse POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
Få uttalelse GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
Avbryt-setningen POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

Bemerkning

Opprett, hent og slett-operasjoner bruker HC-sesjonen id. Setningsoperasjoner bruker den underliggende Livy sessionId.