Introdução à API Livy para sessões de alta simultaneidade do Fabric

Aplica-se a:✅ Engenharia de Dados e Ciência de Dados do Fabric

Sessões de alta simultaneidade (HC) permitem que vários chamadores compartilhem uma única sessão do Spark sem interferir entre si. Em vez de provisionar uma sessão separada para cada carga de trabalho, você adquire uma sessão HC e a API Fabric atribui a ela um REPL isolado em uma sessão subjacente compartilhada.

Neste artigo, você usará a API Fabric Livy para adquirir sessões HC, verificar o empacotamento de sessões, executar instruções em paralelo e confirmar o isolamento REPL.

Pré-requisitos

Substitua os espaços reservados {Entra_TenantID}, {Entra_ClientID}, {Entra_ClientSecret}, {Fabric_WorkspaceID} e {Fabric_LakehouseID} por seus valores ao seguir os exemplos neste artigo.

O que são sessões de alta simultaneidade?

Sessões de alta simultaneidade (HC) permitem que vários usuários ou processos compartilhem uma única sessão do Spark. Cada chamador obtém um REPL isolado (Read-Eval-Print Loop) em uma sessão compartilhada. Declarações de diferentes chamadores não interferem entre si.

Empacotamento de sessão

Quando você cria duas sessões de HC com o mesmo sessionTag, a API Fabric as empacota na mesma sessão subjacente do Livy. Cada sessão de HC obtém seu próprio REPL, que fornece:

  • Eficiência de recursos: vários usuários compartilham uma sessão do Spark em vez de cada uma criar a sua própria.
  • Isolamento de REPL: variáveis e estado em um REPL não são visíveis para outras pessoas.
  • Execução paralela: instruções em repls diferentes podem ser executadas simultaneamente.

IDs de chave

ID Exclusivo por Usado para
Sessão de HC id Sessão de HC Status da consulta, excluir sessão
sessionId Sessão Livy (compartilhada quando empacotada) URLs de declaração
replId REPL (contexto isolado) URLs de declaração

Importante

sessionId e replId só estarão disponíveis quando a sessão HC atingir o estado Idle.

Como as sessões de HC diferem das sessões regulares do Livy

Aspecto Sessão regular do Livy Sessão de HC
Ponto de extremidade .../sessions .../highConcurrencySessions
Declarações Enviado diretamente para a sessão Enviado por meio de um REPL (/repls/{replId}/statements)
Aquisição A sessão transforma-se diretamente em idle NotStartedem seguida,AcquiringHighConcurrencySessionIdle
Empacotamento de sessão Não aplicável Opcional sessionTag para compartilhar sessões subjacentes do Spark

Guia passo a passo

1. Autenticar com Microsoft Entra

Adquira um token de acesso usando o fluxo de credenciais do cliente SPN. Substitua os valores de espaço reservado por suas credenciais autênticas.

from msal import ConfidentialClientApplication

# Configuration — Replace with your actual values
tenant_id = "{Entra_TenantID}"       # Microsoft Entra tenant ID
client_id = "{Entra_ClientID}"       # Service principal application ID
client_secret = "{Entra_ClientSecret}"  # Service principal client secret

# OAuth settings
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = "https://analysis.windows.net/powerbi/api/.default"

app = ConfidentialClientApplication(
    client_id=client_id,
    authority=authority,
    client_credential=client_secret,
)

result = app.acquire_token_for_client(scopes=[scope])

if "access_token" in result:
    token = result["access_token"]
    print("Access token acquired successfully.")
else:
    raise RuntimeError(
        f"Failed to acquire token: {result.get('error_description', 'unknown error')}"
    )

2. Crie duas sessões de HC com a mesma tag de sessão

Criar duas sessões de HC usando sessionTag: "demo-tag". Como eles compartilham a mesma tag, a API de Fabric os empacota na mesma sessão subjacente Livy. Cada sessão obtém seu próprio REPL isolado.

import json
import requests

# Fabric resource IDs — Replace with your actual values
workspace_id = "{Fabric_WorkspaceID}"
lakehouse_id = "{Fabric_LakehouseID}"

# Construct the HC session endpoint URL
livy_base_url = (
    f"https://api.fabric.microsoft.com/v1"
    f"/workspaces/{workspace_id}"
    f"/lakehouses/{lakehouse_id}"
    f"/livyapi/versions/2023-12-01"
    f"/highConcurrencySessions"
)

headers = {"Authorization": f"Bearer {token}"}
session_tag = "demo-tag"

print(f"HC session endpoint: {livy_base_url}")
print(f"Session tag: {session_tag}")
print()

# Create HC Session A
print("Creating HC Session A...")
resp_a = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_a.status_code == 202, f"Failed: {resp_a.status_code} — {resp_a.text}"
session_a = resp_a.json()
hc_id_a = session_a["id"]
print(f"  HC session A id: {hc_id_a}  state: {session_a['state']}")

# Create HC Session B
print("Creating HC Session B...")
resp_b = requests.post(livy_base_url, headers=headers, json={"sessionTag": session_tag})
assert resp_b.status_code == 202, f"Failed: {resp_b.status_code} — {resp_b.text}"
session_b = resp_b.json()
hc_id_b = session_b["id"]
print(f"  HC session B id: {hc_id_b}  state: {session_b['state']}")

session_url_a = f"{livy_base_url}/{hc_id_a}"
session_url_b = f"{livy_base_url}/{hc_id_b}"

3. Sondar ambas as sessões até que estejam prontas e verificar a agregação das sessões

Cada sessão faz a transição por esses estados: NotStarted, AcquiringHighConcurrencySessione depois Idle.

Depois que ambas as sessões estiverem Idle, a saída confirmará os seguintes detalhes sobre o empacotamento de sessão:

  • As duas IDs de sessão HC (hc_id_a e hc_id_b) são diferentes, confirmando que cada chamada "acquire" retornou uma sessão HC distinta.
  • Os IDs de sessão Livy subjacentes (sessionId_a e sessionId_b) correspondem, o que confirma que ambas as sessões de HC foram empacotadas na mesma sessão Livy.
  • As IDs REPL (replId_a e replId_b) são diferentes, confirmando que cada sessão de HC tem seu próprio contexto de execução isolado.

O código a seguir sonda ambas as sessões até que elas estejam prontas e imprima a saída de verificação:

import time

ACQUIRING_STATES = {"NotStarted", "starting", "AcquiringHighConcurrencySession"}
POLL_INTERVAL = 5

def poll_until_ready(url, label):
    """Poll an HC session until it leaves the acquisition states."""
    print(f"[{label}] Polling...")
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        state = data.get("state", "unknown")
        print(f"  [{label}] state={state}  sessionId={data.get('sessionId', 'N/A')}  replId={data.get('replId', 'N/A')}")
        if state in ("Dead", "Killed", "Failed"):
            raise RuntimeError(f"[{label}] Session failed: {state}")
        if state not in ACQUIRING_STATES:
            return data
        time.sleep(POLL_INTERVAL)

ready_a = poll_until_ready(session_url_a, "A")
ready_b = poll_until_ready(session_url_b, "B")

livy_session_id_a = ready_a["sessionId"]
livy_session_id_b = ready_b["sessionId"]
repl_id_a = ready_a["replId"]
repl_id_b = ready_b["replId"]

print()
print("=" * 50)
print("SESSION PACKING VERIFICATION")
print("=" * 50)
print(f"HC session A id:    {hc_id_a}")
print(f"HC session B id:    {hc_id_b}")
print(f"HC IDs differ:      {hc_id_a != hc_id_b}")
print()
print(f"Livy sessionId A:   {livy_session_id_a}")
print(f"Livy sessionId B:   {livy_session_id_b}")
print(f"Same Livy session:  {livy_session_id_a == livy_session_id_b}")
print()
print(f"REPL A:             {repl_id_a}")
print(f"REPL B:             {repl_id_b}")
print(f"REPLs differ:       {repl_id_a != repl_id_b}")

4. Enviar comandos para ambas as REPLs em paralelo

Envie duas solicitações POST (uma por REPL) antes de sondar os resultados. Como os REPLs compartilham a mesma sessão do Spark, ambas as instruções podem ser executadas simultaneamente. Esse código também define a poll_statement função auxiliar usada nas etapas restantes.

# Build statement URLs for each REPL
stmts_url_a = f"{livy_base_url}/{livy_session_id_a}/repls/{repl_id_a}/statements"
stmts_url_b = f"{livy_base_url}/{livy_session_id_b}/repls/{repl_id_b}/statements"

# Fire both statement POSTs before polling
print("Submitting to REPL A: print('Hello from REPL A')")
resp_a = requests.post(stmts_url_a, headers=headers, json={"code": "print('Hello from REPL A')", "kind": "pyspark"})
assert resp_a.status_code in (200, 201), f"Failed: {resp_a.text}"
stmt_a = resp_a.json()
stmt_url_a = f"{stmts_url_a}/{stmt_a['id']}"

print("Submitting to REPL B: print('Hello from REPL B')")
resp_b = requests.post(stmts_url_b, headers=headers, json={"code": "print('Hello from REPL B')", "kind": "pyspark"})
assert resp_b.status_code in (200, 201), f"Failed: {resp_b.text}"
stmt_b = resp_b.json()
stmt_url_b = f"{stmts_url_b}/{stmt_b['id']}"

print("Both statements submitted. Polling for results...")

# Poll both statements
def poll_statement(url, label):
    while True:
        resp = requests.get(url, headers=headers, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        if data.get("state") not in ("waiting", "running"):
            return data
        time.sleep(5)

result_a = poll_statement(stmt_url_a, "A")
result_b = poll_statement(stmt_url_b, "B")

output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")

print()
print("=" * 50)
print("PARALLEL EXECUTION RESULTS")
print("=" * 50)
print(f"REPL A output: {output_a}")
print(f"REPL B output: {output_b}")

5. Verificar o isolamento de REPL

Defina uma variável x = 42 no REPL A e tente acessá-la do REPL B. Embora ambos os REPLs compartilhem a mesma sessão do Spark, suas variáveis são isoladas.

# Set x = 42 in REPL A
print("[A] Setting x = 42...")
resp = requests.post(stmts_url_a, headers=headers, json={"code": "x = 42; print(x)", "kind": "pyspark"})
stmt_url = f"{stmts_url_a}/{resp.json()['id']}"
result_a = poll_statement(stmt_url, "A")
output_a = result_a.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[A] Output: {output_a}")

# Try to read x from REPL B — should get NameError
print("\n[B] Trying to read x (expect NameError)...")
code_b = "try:\n    print(x)\nexcept NameError as e:\n    print(f'NameError: {e}')"
resp = requests.post(stmts_url_b, headers=headers, json={"code": code_b, "kind": "pyspark"})
stmt_url = f"{stmts_url_b}/{resp.json()['id']}"
result_b = poll_statement(stmt_url, "B")
output_b = result_b.get("output", {}).get("data", {}).get("text/plain", "")
print(f"[B] Output: {output_b}")

print()
print("=" * 50)
print("REPL ISOLATION RESULTS")
print("=" * 50)
print(f"REPL A (x = 42): {output_a}")
print(f"REPL B (print(x)): {output_b}")

6. Limpar ambas as sessões de HC

Exclua ambas as sessões de HC para liberar recursos. Use a sessão HC id, não a subjacente sessionId.

for label, url in [("A", session_url_a), ("B", session_url_b)]:
    print(f"[{label}] Deleting HC session...")
    resp = requests.delete(url, headers=headers)
    if resp.status_code in (200, 204):
        print(f"[{label}] Deleted successfully.")
    elif resp.status_code == 404:
        print(f"[{label}] Already deleted.")
    else:
        print(f"[{label}] Unexpected response: {resp.status_code} — {resp.text}")

Exibir seus trabalhos no hub de monitoramento

  1. Navegue até Monitor na navegação do lado esquerdo.
  2. Selecione o nome da atividade mais recente para exibir os detalhes da sessão.
  3. Observe que ambas as sessões de HC compartilham a mesma sessão do Spark subjacente, o que confirma o agrupamento de sessões.

Referência de endpoints da API

Operação Método Ponto final
Criar sessão de HC POST /v1/workspaces/{workspaceId}/lakehouses/{lakehouseId}/livyapi/versions/2023-12-01/highConcurrencySessions
Obter sessão de HC GET .../highConcurrencySessions/{highConcurrencySessionId}
Excluir sessão de HC DELETE .../highConcurrencySessions/{highConcurrencySessionId}
Enviar Instrução POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements
Instrução Get GET .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}
Instrução de Cancelamento POST .../highConcurrencySessions/{sessionId}/repls/{replId}/statements/{statementId}/cancel

Observação

As operações Criar, Obter e Excluir usam a sessão HC id. As operações de declaração usam o Livy sessionId subjacente.