Comienza con la API de Livy para sesiones de alta concurrencia de Fabric

Se aplica a:✅ Fabric Data Engineering and Data Science

Las sesiones de alta simultaneidad (HC) permiten que varios usuarios compartan una sola sesión de Spark sin interferir unos con otros. En lugar de aprovisionar una sesión independiente para cada carga de trabajo, se adquiere una sesión de HC y la API de Fabric asigna a esta sesión una REPL aislada dentro de una sesión subyacente compartida.

En este artículo, utiliza la API de Fabric Livy para adquirir sesiones de HC, verificar el empaquetado de sesiones, realizar ejecuciones en paralelo y confirmar el aislamiento de REPL.

Prerrequisitos

Reemplace los marcadores de posición {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} y {Fabric_LakehouseID} por los valores al seguir los ejemplos de este artículo.

¿Qué son las sesiones de simultaneidad alta?

Las sesiones de alta simultaneidad (HC) permiten a varios usuarios o procesos compartir una sola sesión de Spark. Cada usuario obtiene un REPL aislado (Bucle de Leer-Evaluar-Imprimir) dentro de la sesión compartida. Las afirmaciones de diferentes llamadas no interfieren entre sí.

Empaquetado de sesiones

Al crear dos sesiones de HC con la misma sessionTag, la API de Fabric las agrupa en la misma sesión subyacente Livy. Cada sesión de HC obtiene su propia REPL, que proporciona:

  • Eficiencia de los recursos: varios usuarios comparten una única sesión de Spark en lugar de que cada uno cree la suya propia.
  • Aislamiento de REPL: las variables y el estado de una REPL no son visibles para otros.
  • Ejecución en paralelo: las instrucciones en diferentes REPLs se pueden ejecutar simultáneamente.

Identificadores de clave

ID Único por Se usa para
Sesión de HC id Sesión de HC Estado de sondeo, eliminación de sesión
sessionId Sesión de Livy (compartida cuando está empaquetada) URLs de Declaración
replId REPL (contexto aislado) URLs de Declaración

Importante

El sessionId y replId solo están disponibles una vez que la sesión HC alcanza el estado Idle.

Diferencias entre las sesiones de HC y las sesiones normales de Livy

Aspecto Sesión regular de Livy Sesión de HC
Punto final .../sessions .../highConcurrencySessions
Declaraciones Enviado directamente a la sesión Enviado a través de un REPL (/repls/{replId}/statements)
Adquisición La sesión se convierte directamente en idle NotStarteda continuación,AcquiringHighConcurrencySessionIdle
Empaquetado de sesiones No es aplicable Opcional sessionTag para compartir sesiones de Spark subyacentes

Tutorial paso a paso

1. Autenticación con Microsoft Entra

Adquiera un token de acceso mediante el flujo de credenciales de cliente de SPN. Sustituya los valores de marcador de posición por sus credenciales reales.

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. Crear dos sesiones HC con la misma etiqueta de sesión

Cree dos sesiones de HC mediante sessionTag: "demo-tag". Dado que comparten la misma etiqueta, la API de Fabric los empaqueta en la misma Sesión subyacente de Livy. Cada sesión obtiene su propio entorno interactivo de evaluación (REPL) aislado.

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. Sondear ambas sesiones hasta que estén listas y comprobar la agregación de sesiones

Cada sesión realiza la transición a través de estos estados: NotStarted, AcquiringHighConcurrencySessiony, a continuación Idle, .

Una vez que ambas sesiones son Idle, la salida confirma los siguientes detalles respecto al empaquetado de las sesiones:

  • Los dos identificadores de sesión de HC (hc_id_a y hc_id_b) son diferentes, lo que confirma que cada llamada "acquire" devolvió una sesión hc distinta.
  • Los identificadores de sesión de Livy subyacentes (sessionId_a y sessionId_b) coinciden, confirmando que ambas sesiones de HC se empaquetaron en la misma sesión de Livy.
  • Los identificadores de REPL (replId_a y replId_b) son diferentes, lo que confirma que cada sesión de HC tiene su propio contexto de ejecución aislado.

El código siguiente sondea ambas sesiones hasta que están listas e imprimen la salida de comprobación:

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. Enviar instrucciones a ambas REPL en paralelo

Envíe dos solicitudes POST (una por REPL) antes de sondear cualquiera de los resultados. Dado que las REPL comparten la misma sesión de Spark, ambas instrucciones se pueden ejecutar simultáneamente. Este código también define la poll_statement función auxiliar que se usa en los pasos restantes.

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

Comprobación del aislamiento de REPL

Establezca una variable x = 42 en REPL A y, a continuación, intente acceder a ella desde REPL B. Aunque ambas REPL comparten la misma sesión de Spark, sus variables están aisladas.

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. Limpieza de ambas sesiones de HC

Elimine ambas sesiones de HC para liberar recursos. Use la sesión id de HC, no la subyacente sessionId.

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

Ver tus trabajos en el centro de supervisión

  1. Vaya a Monitor en el panel de navegación izquierdo.
  2. Seleccione el nombre de actividad más reciente para ver los detalles de la sesión.
  3. Tenga en cuenta que ambas sesiones de HC comparten la misma sesión de Spark subyacente, lo que confirma el empaquetado de sesiones.

Referencia de puntos finales de la API

Operación Método Punto final
Creación de una sesión de HC POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
Obtener sesión de HC GET .../highConcurrencySessions/{highConcurrencySessionId}
Eliminación de la sesión de HC DELETE .../highConcurrencySessions/{highConcurrencySessionId}
Enviar declaración POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
Obtener declaración GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
Cancelar instrucción POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

Nota:

Las operaciones Create, Get y Delete usan la sesión de HC id. Las operaciones de instrucciones utilizan el Livy subyacente sessionId.