Comece com a API do Livy para sessões de alta concorrência do Fabric

Aplica-se a: ✅ Fabric Engenharia de Dados e Ciência de Dados

Sessões de alta concorrência (HC) permitem que vários chamadores partilhem uma única sessão Spark sem interferir entre si. Em vez de provisionar uma sessão separada para cada tarefa, obtém uma sessão HC e a API do Fabric atribui-lhe um REPL isolado dentro de uma sessão subjacente partilhada.

Neste artigo, utiliza a API Fabric Livy para obter sessões HC, verificar a agrupagem das sessões, executar declarações em paralelo e confirmar o isolamento do REPL.

Pré-requisitos

Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} pelos seus valores ao seguir os exemplos deste artigo.

O que são sessões de alta concorrência?

Sessões de alta concorrência (HC) permitem que múltiplos utilizadores ou processos partilhem uma única sessão Spark. Cada interlocutor recebe um REPL (Read-Eval-Print Loop) isolado dentro da sessão partilhada. Declarações de diferentes chamadas não interferem umas com as outras.

Empacotamento de sessões

Quando crias duas sessões HC com o mesmo sessionTag, a API Fabric empacota-as na mesma sessão Livy subjacente. Cada sessão de HC recebe a sua própria REPL, que proporciona:

  • Eficiência de recursos: Vários utilizadores partilham uma sessão Spark em vez de cada um criar a sua própria.
  • Isolamento do REPL: As variáveis e o estado de um REPL não são visíveis para os outros.
  • Execução paralela: Instruções em diferentes REPLs podem correr em simultâneo.

Identificadores de Chaves

ID Único por Usado para
Sessão de HC id Sessão de HC Estado da sondagem, apagar sessão
sessionId Sessão Livy (partilhada quando compactada) URLs de declarações
replId REPL (contexto isolado) URLs de declarações

Importante

Os sessionId e replId só estão disponíveis uma vez que a sessão HC alcance o estado Idle.

Como as sessões de HC diferem das sessões normais do Apache Livy

Aspeto Sessão regular de Livy Sessão de HC
Ponto final .../sessions .../highConcurrencySessions
Declarações Submetido diretamente à sessão Submetido por meio de um REPL (/repls/{replId}/statements)
Aquisição A sessão passa a ser idle diretamente NotStarted então AcquiringHighConcurrencySession então Idle
Embalagem de Sessões Não aplicável Opcional sessionTag para partilhar sessões subjacentes do Spark

Passo a passo

1. Autenticar com Microsoft Entra

Adquira um token de acesso usando o fluxo de credenciais do cliente SPN. Substitui os valores provisórios pelas tuas credenciais reais.

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. Criar duas sessões HC com a mesma etiqueta de sessão

Crie duas sessões HC usando sessionTag: "demo-tag". Como partilham a mesma etiqueta, a API da Fabric empacota-as na mesma sessão Livy subjacente. Cada sessão tem o seu próprio REPL isolado.

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. Interrogar ambas as sessões até estarem prontas e verificar a embalagem da sessão

Cada sessão transita por estes estados: NotStarted, AcquiringHighConcurrencySession, e depois Idle.

Uma vez que ambas as sessões são Idle, a saída confirma os seguintes detalhes sobre o empacotamento das sessões:

  • Os dois IDs de sessão HC (hc_id_a e hc_id_b) são diferentes, confirmando que cada chamada "acquire" devolveu uma sessão HC distinta.
  • Os IDs subjacentes das sessões de Livy (sessionId_a e sessionId_b) coincidem, confirmando que ambas as sessões de HC foram agrupadas na mesma sessão de Livy.
  • Os IDs REPL (replId_a e replId_b) são diferentes, confirmando que cada sessão HC tem o seu próprio contexto de execução isolado.

O código seguinte sonda ambas as sessões até estarem prontas e exibe o resultado da verificação:

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. Submeter declarações a ambos os REPLs em paralelo

Submeta dois pedidos POST (um por REPL) antes de consultar qualquer um para obter os resultados. Como os REPLs partilham a mesma sessão Spark, ambas as instruções podem correr em simultâneo. Este código também define a poll_statement função auxiliar usada nos passos restantes.

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

5. Verificar o isolamento do REPL

Define uma variável x = 42 no REPL A e depois tenta aceder a ela a partir do REPL B. Embora ambos os REPLs partilhem a mesma sessão Spark, as suas variáveis são isoladas.

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. Limpar ambas as sessões de HC

Elimina as duas sessões HC para libertar recursos. Use a sessão HC id, não a subjacente sessionId.

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

Veja os seus empregos no hub de monitorização

  1. Navegue para Monitor no menu de navegação à esquerda.
  2. Selecione o nome da atividade mais recente para ver os detalhes da sessão.
  3. Note que ambas as sessões HC partilham a mesma sessão subjacente do Spark, o que confirma o empacotamento da sessão.

Referência de API endpoints

Funcionamento Método Ponto final
Criar sessão HC POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
Obtenha sessão de HC GET .../highConcurrencySessions/{highConcurrencySessionId}
Eliminar sessão HC DELETE .../highConcurrencySessions/{highConcurrencySessionId}
Enviar declaração POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
Obter declaração GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
Anular declaração POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

Observação

As operações de Criar, Obter e Eliminar usam a sessão HC id. As operações de instrução utilizam o Livy sessionId subjacente.