Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Aplica-se a: ✅ Fabric Engenharia de Dados e Ciência de Dados
Sessões de alta concorrência (HC) permitem que vários chamadores partilhem uma única sessão Spark sem interferir entre si. Em vez de provisionar uma sessão separada para cada tarefa, obtém uma sessão HC e a API do Fabric atribui-lhe um REPL isolado dentro de uma sessão subjacente partilhada.
Neste artigo, utiliza a API Fabric Livy para obter sessões HC, verificar a agrupagem das sessões, executar declarações em paralelo e confirmar o isolamento do REPL.
Pré-requisitos
- Capacidade Premium ou de Avaliação do Fabric com um Lakehouse.
- Um cliente remoto como Visual Studio Code com PySpark e Python 3.10+.
- Um principal de serviço Microsoft Entra (SPN) com acesso ao espaço de trabalho. Regista uma candidatura no Microsoft identity platform.
- Um segredo de cliente para o principal do serviço. Adicione e gere credenciais de candidatura.
Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos seus valores ao seguir os exemplos deste artigo.
O que são sessões de alta concorrência?
Sessões de alta concorrência (HC) permitem que múltiplos utilizadores ou processos partilhem uma única sessão Spark. Cada interlocutor recebe um REPL (Read-Eval-Print Loop) isolado dentro da sessão partilhada. Declarações de diferentes chamadas não interferem umas com as outras.
Empacotamento de sessões
Quando crias duas sessões HC com o mesmo sessionTag, a API Fabric empacota-as na mesma sessão Livy subjacente. Cada sessão de HC recebe a sua própria REPL, que proporciona:
- Eficiência de recursos: Vários utilizadores partilham uma sessão Spark em vez de cada um criar a sua própria.
- Isolamento do REPL: As variáveis e o estado de um REPL não são visíveis para os outros.
- Execução paralela: Instruções em diferentes REPLs podem correr em simultâneo.
Identificadores de Chaves
| ID | Único por | Usado para |
|---|---|---|
Sessão de HC id |
Sessão de HC | Estado da sondagem, apagar sessão |
sessionId |
Sessão Livy (partilhada quando compactada) | URLs de declarações |
replId |
REPL (contexto isolado) | URLs de declarações |
Importante
Os sessionId e replId só estão disponíveis uma vez que a sessão HC alcance o estado Idle.
Como as sessões de HC diferem das sessões normais do Apache Livy
| Aspeto | Sessão regular de Livy | Sessão de HC |
|---|---|---|
| Ponto final | .../sessions |
.../highConcurrencySessions |
| Declarações | Submetido diretamente à sessão | Submetido por meio de um REPL (/repls/{replId}/statements) |
| Aquisição | A sessão passa a ser idle diretamente |
NotStarted então AcquiringHighConcurrencySession então Idle |
| Embalagem de Sessões | Não aplicável | Opcional sessionTag para partilhar sessões subjacentes do Spark |
Passo a passo
1. Autenticar com Microsoft Entra
Adquira um token de acesso usando o fluxo de credenciais do cliente SPN. Substitui os valores provisórios pelas tuas credenciais reais.
from msal import ConfidentialClientApplication
# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}" # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}" # Service principal application ID
client_secret = "{Entra_ClientSecret}" # Service principal client secret
# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"
app = ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential=client_secret,
)
result = app.acquire_token_for_client(scopes=[scope])
if "access_token" in result:
token = result["access_token"]
print("Access token acquired successfully.")
else:
raise RuntimeError(
f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
)
2. Criar duas sessões HC com a mesma etiqueta de sessão
Crie duas sessões HC usando sessionTag: "demo-tag". Como partilham a mesma etiqueta, a API da Fabric empacota-as na mesma sessão Livy subjacente. Cada sessão tem o seu próprio REPL isolado.
import json
import requests
# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"
# Construct the HC session endpoint URL
livy_base_url = (
f"https://api.fabric.microsoft.com/v1"
f"/workspaces/{workspace_id}"
f"/lakehouses/{lakehouse_id}"
f"/livyapi/versions/2023-12-01"
f"/highConcurrencySessions"
)
headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"
print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()
# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f" HC session A id: {hc_id_a} state: {session_a['state']}")
# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f" HC session B id: {hc_id_b} state: {session_b['state']}")
session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"
3. Interrogar ambas as sessões até estarem prontas e verificar a embalagem da sessão
Cada sessão transita por estes estados: NotStarted, AcquiringHighConcurrencySession, e depois Idle.
Uma vez que ambas as sessões são Idle, a saída confirma os seguintes detalhes sobre o empacotamento das sessões:
- Os dois IDs de sessão HC (
hc_id_aehc_id_b) são diferentes, confirmando que cada chamada "acquire" devolveu uma sessão HC distinta. - Os IDs subjacentes das sessões de Livy (
sessionId_aesessionId_b) coincidem, confirmando que ambas as sessões de HC foram agrupadas na mesma sessão de Livy. - Os IDs REPL (
replId_aereplId_b) são diferentes, confirmando que cada sessão HC tem o seu próprio contexto de execução isolado.
O código seguinte sonda ambas as sessões até estarem prontas e exibe o resultado da verificação:
import time
ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5
def poll_until_ready(url, label):
"""Poll an HC session until it leaves the acquisition states."""
print(f"[{label}] Polling...")
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
state = data.get("state", "unknown")
print(f" [{label}] state={state} sessionId={data.get('sessionId', 'N/A')} replId={data.get('replId', 'N/A')}")
if state in ("Dead", "Killed", "Failed"):
raise RuntimeError(f"[{label}] Session failed: {state}")
if state not in ACQUIRING_STATES:
return data
time.sleep(POLL_INTERVAL)
ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")
livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]
print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id: {hc_id_a}")
print(f"HC session B id: {hc_id_b}")
print(f"HC IDs differ: {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A: {livy_session_id_a}")
print(f"Livy sessionId B: {livy_session_id_b}")
print(f"Same Livy session: {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A: {repl_id_a}")
print(f"REPL B: {repl_id_b}")
print(f"REPLs differ: {repl_id_a != repl_id_b}")
4. Submeter declarações a ambos os REPLs em paralelo
Submeta dois pedidos POST (um por REPL) antes de consultar qualquer um para obter os resultados. Como os REPLs partilham a mesma sessão Spark, ambas as instruções podem correr em simultâneo. Este código também define a poll_statement função auxiliar usada nos passos restantes.
# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"
# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"
print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"
print("Both statements submitted. Polling for results...")
# Poll both statements
def poll_statement(url, label):
while True:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if data.get("state") not in ("waiting", "running"):
return data
time.sleep(5)
result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")
5. Verificar o isolamento do REPL
Define uma variável x = 42 no REPL A e depois tenta aceder a ela a partir do REPL B. Embora ambos os REPLs partilhem a mesma sessão Spark, as suas variáveis são isoladas.
# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")
# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n print(x)\nexcept NameError as e:\n print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")
print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")
6. Limpar ambas as sessões de HC
Elimina as duas sessões HC para libertar recursos. Use a sessão HC id, não a subjacente sessionId.
for label, url in [("A", session_url_a), ("B", session_url_b)]:
print(f"[{label}] Deleting HC session...")
resp = requests.delete(url, headers=headers)
if resp.status_code in (200, 204):
print(f"[{label}] Deleted successfully.")
elif resp.status_code == 404:
print(f"[{label}] Already deleted.")
else:
print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")
Veja os seus empregos no hub de monitorização
- Navegue para Monitor no menu de navegação à esquerda.
- Selecione o nome da atividade mais recente para ver os detalhes da sessão.
- Note que ambas as sessões HC partilham a mesma sessão subjacente do Spark, o que confirma o empacotamento da sessão.
Referência de API endpoints
| Funcionamento | Método | Ponto final |
|---|---|---|
| Criar sessão HC | POST |
/v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions |
| Obtenha sessão de HC | GET |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Eliminar sessão HC | DELETE |
.../highConcurrencySessions/{highConcurrencySessionId} |
| Enviar declaração | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements |
| Obter declaração | GET |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId} |
| Anular declaração | POST |
.../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel |
Observação
As operações de Criar, Obter e Eliminar usam a sessão HC id. As operações de instrução utilizam o Livy sessionId subjacente.