Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Aplica-se a:✅ Engenharia de Dados e Ciência de Dados do Fabric
Sessões de alta simultaneidade (HC) permitem que vários chamadores compartilhem uma única sessão do Spark sem interferir entre si. Em vez de provisionar uma sessão separada para cada carga de trabalho, você adquire uma sessão HC e a API Fabric atribui a ela um REPL isolado em uma sessão subjacente compartilhada.
Neste artigo, você usará a API Fabric Livy para adquirir sessões HC, verificar o empacotamento de sessões, executar instruções em paralelo e confirmar o isolamento REPL.
Pré-requisitos
- Capacidade Premium ou Capacidade de Avaliação de Fabric com um Lakehouse.
- Um cliente remoto, como Visual Studio Code com PySpark e Python 3.10+.
- Uma entidade de serviço Microsoft Entra (SPN) com acesso ao workspace. Registre um aplicativo na plataforma de identidade da Microsoft.
- Um segredo do cliente para a entidade de serviço. Adicione e gerencie as credenciais do aplicativo.
Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} por seus valores ao seguir os exemplos neste artigo.
O que são sessões de alta simultaneidade?
Sessões de alta simultaneidade (HC) permitem que vários usuários ou processos compartilhem uma única sessão do Spark. Cada chamador obtém um REPL isolado (Read-Eval-Print Loop) em uma sessão compartilhada. Declarações de diferentes chamadores não interferem entre si.
Empacotamento de sessão
Quando você cria duas sessões de HC com o mesmo sessionTag, a API Fabric as empacota na mesma sessão subjacente do Livy. Cada sessão de HC obtém seu próprio REPL, que fornece:
- Eficiência de recursos: vários usuários compartilham uma sessão do Spark em vez de cada uma criar a sua própria.
- Isolamento de REPL: variáveis e estado em um REPL não são visíveis para outras pessoas.
- Execução paralela: instruções em repls diferentes podem ser executadas simultaneamente.
IDs de chave
| ID | Exclusivo por | Usado para |
|---|---|---|
Sessão de HC id |
Sessão de HC | Status da consulta, excluir sessão |
sessionId |
Sessão Livy (compartilhada quando empacotada) | URLs de declaração |
replId |
REPL (contexto isolado) | URLs de declaração |
Importante
sessionId e replId só estarão disponíveis quando a sessão HC atingir o estado Idle.
Como as sessões de HC diferem das sessões regulares do Livy
| Aspecto | Sessão regular do Livy | Sessão de HC |
|---|---|---|
| Ponto de extremidade | .../sessions |
.../highConcurrencySessions |
| Declarações | Enviado diretamente para a sessão | Enviado por meio de um REPL (/repls/{replId}/statements) |
| Aquisição | A sessão transforma-se diretamente em idle |
NotStartedem seguida,AcquiringHighConcurrencySessionIdle |
| Empacotamento de sessão | Não aplicável | Opcional sessionTag para compartilhar sessões subjacentes do Spark |
Guia passo a passo
1. Autenticar com Microsoft Entra
Adquira um token de acesso usando o fluxo de credenciais do cliente SPN. Substitua os valores de espaço reservado por suas credenciais autênticas.
from msal import ConfidentialClientApplication
# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}" # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}" # Service principal application ID
client_secret = "{Entra_ClientSecret}" # Service principal client secret
# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=[scope])
if "access_token" in result:
token = result["access_token"]
print("Access token acquired successfully.")
else:
raise RuntimeError(
f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
)
2. Crie duas sessões de HC com a mesma tag de sessão
Criar duas sessões de HC usando sessionTag: "demo-tag". Como eles compartilham a mesma tag, a API de Fabric os empacota na mesma sessão subjacente Livy. Cada sessão obtém seu próprio REPL isolado.
import json
import requests
# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"
# Construct the HC session endpoint URL
livy_base_url = (
f"https://api.fabric.microsoft.com/v1"
f"/workspaces/{workspace_id}"
f"/lakehouses/{lakehouse_id}"
f"/livyapi/versions/2023-12-01"
f"/highConcurrencySessions"
)
headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"
print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()
# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f" HC session A id: {hc_id_a} state: {session_a['state']}")
# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f" HC session B id: {hc_id_b} state: {session_b['state']}")
session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"
3. Sondar ambas as sessões até que estejam prontas e verificar a agregação das sessões
Cada sessão faz a transição por esses estados: NotStarted, AcquiringHighConcurrencySessione depois Idle.
Depois que ambas as sessões estiverem Idle, a saída confirmará os seguintes detalhes sobre o empacotamento de sessão:
- As duas IDs de sessão HC (
hc_id_aehc_id_b) são diferentes, confirmando que cada chamada "acquire" retornou uma sessão HC distinta. - Os IDs de sessão Livy subjacentes (
sessionId_aesessionId_b) correspondem, o que confirma que ambas as sessões de HC foram empacotadas na mesma sessão Livy. - As IDs REPL (
replId_aereplId_b) são diferentes, confirmando que cada sessão de HC tem seu próprio contexto de execução isolado.
O código a seguir sonda ambas as sessões até que elas estejam prontas e imprima a saída de verificação:
import time
ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5
def poll_until_ready(url, label):
"""Poll an HC session until it leaves the acquisition states."""
print(f"[{label}] Polling...")
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
print(f" [{label}] state={state} sessionId={data.get('sessionId', 'N/A')} replId={data.get('replId', 'N/A')}")
if state in ("Dead", "Killed", "Failed"):
raise RuntimeError(f"[{label}] Session failed: {state}")
if state not in ACQUIRING_STATES:
return data
time.sleep(POLL_INTERVAL)
ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")
livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]
print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id: {hc_id_a}")
print(f"HC session B id: {hc_id_b}")
print(f"HC IDs differ: {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A: {livy_session_id_a}")
print(f"Livy sessionId B: {livy_session_id_b}")
print(f"Same Livy session: {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A: {repl_id_a}")
print(f"REPL B: {repl_id_b}")
print(f"REPLs differ: {repl_id_a != repl_id_b}")
4. Enviar comandos para ambas as REPLs em paralelo
Envie duas solicitações POST (uma por REPL) antes de sondar os resultados. Como os REPLs compartilham a mesma sessão do Spark, ambas as instruções podem ser executadas simultaneamente. Esse código também define a poll_statement função auxiliar usada nas etapas restantes.
# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"
# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"
print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"
print("Both statements submitted. Polling for results...")
# Poll both statements
def poll_statement(url, label):
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("state") not in ("waiting", "running"):
return data
time.sleep(5)
result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")
5. Verificar o isolamento de REPL
Defina uma variável x = 42 no REPL A e tente acessá-la do REPL B. Embora ambos os REPLs compartilhem a mesma sessão do Spark, suas variáveis são isoladas.
# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")
# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n print(x)\nexcept NameError as e:\n print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")
print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")
6. Limpar ambas as sessões de HC
Exclua ambas as sessões de HC para liberar recursos. Use a sessão HC id, não a subjacente sessionId.
for label, url in [("A", session_url_a), ("B", session_url_b)]:
print(f"[{label}] Deleting HC session...")
resp = requests.delete(url, headers=headers)
if resp.status_code in (200, 204):
print(f"[{label}] Deleted successfully.")
elif resp.status_code == 404:
print(f"[{label}] Already deleted.")
else:
print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")
Exibir seus trabalhos no hub de monitoramento
- Navegue até Monitor na navegação do lado esquerdo.
- Selecione o nome da atividade mais recente para exibir os detalhes da sessão.
- Observe que ambas as sessões de HC compartilham a mesma sessão do Spark subjacente, o que confirma o agrupamento de sessões.
Referência de endpoints da API
| Operação | Método | Ponto final |
|---|---|---|
| Criar sessão de HC | POST |
/v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions |
| Obter sessão de HC | GET |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Excluir sessão de HC | DELETE |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Enviar Instrução | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements |
| Instrução Get | GET |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId} |
| Instrução de Cancelamento | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel |
Observação
As operações Criar, Obter e Excluir usam a sessão HC id. As operações de declaração usam o Livy sessionId subjacente.