Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Se aplica a:✅ Fabric Data Engineering and Data Science
Las sesiones de alta simultaneidad (HC) permiten que varios usuarios compartan una sola sesión de Spark sin interferir unos con otros. En lugar de aprovisionar una sesión independiente para cada carga de trabajo, se adquiere una sesión de HC y la API de Fabric asigna a esta sesión una REPL aislada dentro de una sesión subyacente compartida.
En este artículo, utiliza la API de Fabric Livy para adquirir sesiones de HC, verificar el empaquetado de sesiones, realizar ejecuciones en paralelo y confirmar el aislamiento de REPL.
Prerrequisitos
- Capacidad de Fabric Premium o de prueba con una instancia de Lakehouse.
- Un cliente remoto, como Visual Studio Code con PySpark y Python 3.10+.
- Una entidad de servicio de Microsoft Entra (SPN) con acceso al espacio de trabajo. Registrar una aplicación en la plataforma de identidad de Microsoft.
- Un secreto de cliente para la entidad de servicio. Agregue y administre las credenciales de aplicación.
Reemplace los marcadores de posición {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} y {Fabric_LakehouseID} por los valores al seguir los ejemplos de este artículo.
¿Qué son las sesiones de simultaneidad alta?
Las sesiones de alta simultaneidad (HC) permiten a varios usuarios o procesos compartir una sola sesión de Spark. Cada usuario obtiene un REPL aislado (Bucle de Leer-Evaluar-Imprimir) dentro de la sesión compartida. Las afirmaciones de diferentes llamadas no interfieren entre sí.
Empaquetado de sesiones
Al crear dos sesiones de HC con la misma sessionTag, la API de Fabric las agrupa en la misma sesión subyacente Livy. Cada sesión de HC obtiene su propia REPL, que proporciona:
- Eficiencia de los recursos: varios usuarios comparten una única sesión de Spark en lugar de que cada uno cree la suya propia.
- Aislamiento de REPL: las variables y el estado de una REPL no son visibles para otros.
- Ejecución en paralelo: las instrucciones en diferentes REPLs se pueden ejecutar simultáneamente.
Identificadores de clave
| ID | Único por | Se usa para |
|---|---|---|
Sesión de HC id |
Sesión de HC | Estado de sondeo, eliminación de sesión |
sessionId |
Sesión de Livy (compartida cuando está empaquetada) | URLs de Declaración |
replId |
REPL (contexto aislado) | URLs de Declaración |
Importante
El sessionId y replId solo están disponibles una vez que la sesión HC alcanza el estado Idle.
Diferencias entre las sesiones de HC y las sesiones normales de Livy
| Aspecto | Sesión regular de Livy | Sesión de HC |
|---|---|---|
| Punto final | .../sessions |
.../highConcurrencySessions |
| Declaraciones | Enviado directamente a la sesión | Enviado a través de un REPL (/repls/{replId}/statements) |
| Adquisición | La sesión se convierte directamente en idle |
NotStarteda continuación,AcquiringHighConcurrencySessionIdle |
| Empaquetado de sesiones | No es aplicable | Opcional sessionTag para compartir sesiones de Spark subyacentes |
Tutorial paso a paso
1. Autenticación con Microsoft Entra
Adquiera un token de acceso mediante el flujo de credenciales de cliente de SPN. Sustituya los valores de marcador de posición por sus credenciales reales.
from msal import ConfidentialClientApplication
# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}" # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}" # Service principal application ID
client_secret = "{Entra_ClientSecret}" # Service principal client secret
# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=[scope])
if "access_token" in result:
token = result["access_token"]
print("Access token acquired successfully.")
else:
raise RuntimeError(
f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
)
2. Crear dos sesiones HC con la misma etiqueta de sesión
Cree dos sesiones de HC mediante sessionTag: "demo-tag". Dado que comparten la misma etiqueta, la API de Fabric los empaqueta en la misma Sesión subyacente de Livy. Cada sesión obtiene su propio entorno interactivo de evaluación (REPL) aislado.
import json
import requests
# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"
# Construct the HC session endpoint URL
livy_base_url = (
f"https://api.fabric.microsoft.com/v1"
f"/workspaces/{workspace_id}"
f"/lakehouses/{lakehouse_id}"
f"/livyapi/versions/2023-12-01"
f"/highConcurrencySessions"
)
headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"
print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()
# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f" HC session A id: {hc_id_a} state: {session_a['state']}")
# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f" HC session B id: {hc_id_b} state: {session_b['state']}")
session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"
3. Sondear ambas sesiones hasta que estén listas y comprobar la agregación de sesiones
Cada sesión realiza la transición a través de estos estados: NotStarted, AcquiringHighConcurrencySessiony, a continuación Idle, .
Una vez que ambas sesiones son Idle, la salida confirma los siguientes detalles respecto al empaquetado de las sesiones:
- Los dos identificadores de sesión de HC (
hc_id_ayhc_id_b) son diferentes, lo que confirma que cada llamada "acquire" devolvió una sesión hc distinta. - Los identificadores de sesión de Livy subyacentes (
sessionId_aysessionId_b) coinciden, confirmando que ambas sesiones de HC se empaquetaron en la misma sesión de Livy. - Los identificadores de REPL (
replId_ayreplId_b) son diferentes, lo que confirma que cada sesión de HC tiene su propio contexto de ejecución aislado.
El código siguiente sondea ambas sesiones hasta que están listas e imprimen la salida de comprobación:
import time
ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5
def poll_until_ready(url, label):
"""Poll an HC session until it leaves the acquisition states."""
print(f"[{label}] Polling...")
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
print(f" [{label}] state={state} sessionId={data.get('sessionId', 'N/A')} replId={data.get('replId', 'N/A')}")
if state in ("Dead", "Killed", "Failed"):
raise RuntimeError(f"[{label}] Session failed: {state}")
if state not in ACQUIRING_STATES:
return data
time.sleep(POLL_INTERVAL)
ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")
livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]
print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id: {hc_id_a}")
print(f"HC session B id: {hc_id_b}")
print(f"HC IDs differ: {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A: {livy_session_id_a}")
print(f"Livy sessionId B: {livy_session_id_b}")
print(f"Same Livy session: {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A: {repl_id_a}")
print(f"REPL B: {repl_id_b}")
print(f"REPLs differ: {repl_id_a != repl_id_b}")
4. Enviar instrucciones a ambas REPL en paralelo
Envíe dos solicitudes POST (una por REPL) antes de sondear cualquiera de los resultados. Dado que las REPL comparten la misma sesión de Spark, ambas instrucciones se pueden ejecutar simultáneamente. Este código también define la poll_statement función auxiliar que se usa en los pasos restantes.
# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"
# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"
print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"
print("Both statements submitted. Polling for results...")
# Poll both statements
def poll_statement(url, label):
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("state") not in ("waiting", "running"):
return data
time.sleep(5)
result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")
Comprobación del aislamiento de REPL
Establezca una variable x = 42 en REPL A y, a continuación, intente acceder a ella desde REPL B. Aunque ambas REPL comparten la misma sesión de Spark, sus variables están aisladas.
# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")
# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n print(x)\nexcept NameError as e:\n print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")
print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")
6. Limpieza de ambas sesiones de HC
Elimine ambas sesiones de HC para liberar recursos. Use la sesión id de HC, no la subyacente sessionId.
for label, url in [("A", session_url_a), ("B", session_url_b)]:
print(f"[{label}] Deleting HC session...")
resp = requests.delete(url, headers=headers)
if resp.status_code in (200, 204):
print(f"[{label}] Deleted successfully.")
elif resp.status_code == 404:
print(f"[{label}] Already deleted.")
else:
print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")
Ver tus trabajos en el centro de supervisión
- Vaya a Monitor en el panel de navegación izquierdo.
- Seleccione el nombre de actividad más reciente para ver los detalles de la sesión.
- Tenga en cuenta que ambas sesiones de HC comparten la misma sesión de Spark subyacente, lo que confirma el empaquetado de sesiones.
Referencia de puntos finales de la API
| Operación | Método | Punto final |
|---|---|---|
| Creación de una sesión de HC | POST |
/v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions |
| Obtener sesión de HC | GET |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Eliminación de la sesión de HC | DELETE |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Enviar declaración | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements |
| Obtener declaración | GET |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId} |
| Cancelar instrucción | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel |
Nota:
Las operaciones Create, Get y Delete usan la sesión de HC id. Las operaciones de instrucciones utilizan el Livy subyacente sessionId.