Partilhar via


Execução e orquestração de notebooks com NotebookUtils

Use os utilitários do caderno para executar um caderno, executar vários cadernos em paralelo ou sair de um caderno com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.notebook.help()

A tabela seguinte lista os métodos de execução e orquestração disponíveis em notebooks:

Método Signature Descrição
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Executa um bloco de anotações e retorna seu valor de saída.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Executa vários cadernos em simultâneo com suporte para relações de dependência.
validateDAG validateDAG(dag: Any): bool Valida se uma definição DAG está corretamente estruturada.
exit exit(value: str): None Sai do caderno atual com um valor.

Para operações CRUD de notebook (criar, obter, atualizar, eliminar, listar), consulte Gerir artefatos de notebook.

Observação

O config parâmetro em runMultiple() só está disponível em Python. Scala e R não suportam este parâmetro.

Observação

Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).

Referenciar um bloco de notas

O run() método faz referência a um caderno e devolve o seu valor de saída. Você pode executar chamadas de função de aninhamento em um bloco de anotações interativamente ou em um pipeline. O notebook que está a ser referenciado é executado no pool Spark do notebook que invoca esta função.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Por exemplo:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Valor de retorno

O run() método devolve a cadeia exata passada no notebookutils.notebook.exit(value) caderno filho. Se exit() não for chamada no caderno filho, é devolvida uma cadeia vazia ("").

Os cadernos Fabric também suportam a referência de cadernos entre espaços de trabalho, especificando o ID do espaço de trabalho.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Abra o link snapshot na saída da célula para inspecionar a execução de referência. O snapshot capta os resultados das execuções e ajuda-te a depurar o caderno referenciado.

Captura de ecrã do resultado da execução de referência.

Captura de ecrã de um exemplo de instantâneo.

Configurar cadernos filhos para receber parâmetros

Quando crias um notebook filho chamado através de run() ou runMultiple(), configura uma célula de parâmetros para que o notebook possa receber argumentos do notebook principal:

  1. Cria uma célula de código com valores de parâmetros padrão.
  2. Marque a célula como uma célula de parâmetros selecionando Marcar célula como parâmetros na interface do caderno.
  3. Durante a execução, os valores das células dos parâmetros são substituídos pelos argumentos que são passados pelo principal.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Sugestão

Os valores de saída são sempre cadeias de caracteres. Se precisar de um valor numérico no notebook pai, converta o resultado após a recuperação (por exemplo, int(result)).

Considerações

  • O bloco de anotações de referência entre espaços de trabalho é suportado pelas versões 1.2 e superiores do runtime.
  • Se usares os arquivos em Recurso de Caderno, usa notebookutils.nbResPath no caderno referenciado para assegurar que aponte para a mesma pasta da execução interativa.
  • Execução de referência permite que os notebooks filhos sejam executados somente se usarem a mesma casa de dados do pai, herdarem a casa de dados do pai, ou se nenhum deles definir uma. A execução é bloqueada se a criança especificar uma casa de lago diferente do caderno principal. Para contornar esta verificação, defina useRootDefaultLakehouse: True nos argumentos.
  • Não chames notebookutils.notebook.exit(value) dentro de um try-catch bloco. A chamada de saída não terá efeito quando estiver envolvida no manuseamento de exceções.

Referência: executar vários cadernos em paralelo

Use notebookutils.notebook.runMultiple() para executar vários cadernos em paralelo ou numa estrutura topológica pré-definida. A API utiliza uma implementação multithread dentro de uma sessão do Spark, o que significa que os notebooks referenciados partilham recursos de computação.

Com notebookutils.notebook.runMultiple(), você pode:

  • Execute vários notebooks simultaneamente, sem precisar esperar que cada um termine.

  • Especifique as dependências e a ordem de execução de seus blocos de anotações, usando um formato JSON simples.

  • Otimize o uso dos recursos de computação do Spark e reduza o custo de seus projetos do Fabric.

  • Visualize as capturas de ecrã de cada registro de execução do notebook na saída e depure/monitore convenientemente as tarefas do notebook.

  • Obtenha o valor de saída de cada atividade executiva e use-o em tarefas a jusante.

Corra notebookutils.notebook.help("runMultiple") para ver mais exemplos e detalhes de utilização.

Faça uma lista simples de cadernos

O exemplo seguinte executa uma lista de cadernos em paralelo:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

O resultado da execução do bloco de anotações raiz é o seguinte:

Captura de ecrã de uma lista de blocos de notas de referência.

Valor de retorno

O runMultiple() método devolve um dicionário onde cada chave é o nome da atividade e cada valor é um dicionário com as seguintes chaves:

  • exitVal: A string devolvida pela chamada do exit() caderno filho, ou uma string vazia se exit() não for chamada.
  • exception: Um objeto de erro se a atividade falhou, ou None se teve sucesso.

Executar cadernos com uma estrutura DAG

O exemplo seguinte executa cadernos numa estrutura DAG usando notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

O resultado da execução do bloco de anotações raiz é o seguinte:

Captura de ecrã referenciando uma lista de blocos de notas com parâmetros.

Referência do parâmetro DAG

A tabela seguinte descreve cada campo que pode usar na definição DAG:

Campo Nível Obrigatório Descrição
activities Raiz Sim Uma lista de objetos de atividade que definem os cadernos a executar.
timeoutInSeconds Raiz No Tempo limite para todo o DAG. O padrão é 43200 (12 horas).
concurrency Raiz No Número máximo de cadernos para correr em simultâneo. O padrão é 3 vezes o número de núcleos de CPU disponível. Defina este valor explicitamente se precisar de controlo mais apertado, ou use 0 para concorrência ilimitada.
name Activity Sim Um nome único para a atividade. Usado para identificar resultados e definir dependências.
path Activity Sim O nome do item do caderno ou o caminho a executar.
timeoutPerCellInSeconds Activity No Tempo máximo de espera para cada célula no caderno infantil. O padrão é 90 segundos.
args Activity No Um dicionário de parâmetros para passar ao caderno criança.
workspace Activity No O nome ou ID do espaço de trabalho onde o caderno se encontra. Por defeito, o notebook secundário corre no mesmo espaço de trabalho que o chamador.
retry Activity No Número de tentativas de repetição se a atividade falhar. O padrão é 0.
retryIntervalInSeconds Activity No Tempo de espera em segundos entre tentativas de repetição. O padrão é 0.
dependencies Activity No Uma lista de nomes de atividades que devem ser concluídas antes de esta atividade começar.

Valores de saída de referência entre atividades

Pode referenciar o valor de saída de uma atividade de dependência no args campo usando a @activity() expressão. Este padrão permite passar dados entre cadernos num DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Sugestão

Use a @activity('activity_name').exitValue() expressão no args campo para passar resultados de uma atividade para outra dentro de um DAG.

Constrói um DAG dinâmico

Pode gerar estruturas DAG programaticamente para cenários como processamento de dispersão em múltiplas partições:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Validar um DAG

Use validateDAG() para verificar se a sua estrutura DAG é válida antes da execução. Deteta problemas como nomes duplicados de atividades, dependências em falta e referências circulares.

notebookutils.notebook.validateDAG(DAG)

Valor de retorno

O validateDAG() método retorna True se a estrutura DAG for válida ou levanta uma exceção se a validação falhar.

Sugestão

Ligue sempre validateDAG() antes de runMultiple() nos fluxos de trabalho de produção para detetar cedo erros estruturais.

Lidar com falhas do runMultiple

O runMultiple() método devolve um dicionário onde cada chave é o nome da atividade e cada valor contém um exitVal (string) e um exception (objeto de erro ou None). Pode inspecionar resultados parciais mesmo quando algumas atividades falham:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Considerações

  • O grau de paralelismo da execução de vários blocos de anotações é restrito ao recurso de computação total disponível de uma sessão do Spark.
  • O número padrão de portáteis concorrentes é 3 vezes o número de núcleos disponíveis da CPU. Podes personalizar este valor, mas o paralelismo excessivo pode levar a problemas de estabilidade e desempenho devido ao elevado uso de recursos de computação. Se surgirem problemas, considere separar os blocos de anotações em várias runMultiple chamadas ou reduzir a simultaneidade ajustando o campo de simultaneidade no parâmetro DAG.
  • O timeout padrão para todo o DAG é de 12 horas, e o timeout padrão para cada célula num caderno criança é de 90 segundos. Você pode alterar o tempo limite definindo os campos timeoutInSeconds e timeoutPerCellInSeconds no parâmetro DAG.
  • Configure retry e retryIntervalInSeconds para atividades que possam falhar devido a problemas transitórios, como timeouts de rede ou indisponibilidade temporária de serviços.
  • Notebooks paralelos partilham recursos de computação numa única sessão Spark. Monitorizar a utilização de recursos para evitar pressão de memória e contenção da CPU.

Sair de um bloco de notas

O exit() método sai de um caderno com um valor. Você pode executar chamadas de função de aninhamento em um bloco de anotações interativamente ou em um pipeline.

  • Quando chamas uma exit() função a partir de um caderno de notas de forma interativa, o caderno Fabric lança uma exceção, salta a execução de células subsequentes e mantém a sessão do Spark viva.

  • Quando orquestra um notebook num pipeline que chama uma exit() função, a atividade do notebook retorna com um valor de saída. Isto completa a execução do pipeline e interrompe a sessão Spark.

  • Quando se chama a função exit() num caderno que está a ser referenciado, o Fabric Spark interrompe a execução adicional do caderno referenciado e continua a executar as próximas células no caderno principal que chama a função run(). Por exemplo: o Bloco de Anotações1 tem três células e chama uma exit() função na segunda célula. O Notebook2 tem cinco células e chama run(notebook1) na terceira célula. Quando executas o Notebook2, o Notebook1 para na segunda célula ao atingir a função exit(). O Notebook2 continua a executar a sua quarta célula e a quinta célula.

notebookutils.notebook.exit("value string")

Comportamento de retorno

O exit() método não devolve um valor. Termina o caderno atual e passa a string fornecida para o caderno ou pipeline que chama.

Observação

A exit() função sobrescreve a saída da célula atual. Para evitar perder a saída de outras instruções de código, chame notebookutils.notebook.exit() em uma célula separada.

Importante

Não chames notebookutils.notebook.exit() dentro de um try-catch bloco. A saída não terá efeito quando envolvida em tratamento de exceções. A exit() chamada tem de estar ao nível superior do seu código para funcionar corretamente.

Por exemplo:

O caderno Sample1 tem as seguintes duas células:

  • A célula 1 define um parâmetro de entrada com o valor padrão definido como 10.

  • A célula 2 sai do caderno com input como valor de saída.

Captura de tela mostrando um bloco de anotações de exemplo da função de saída.

Você pode executar o Sample1 em outro bloco de anotações com valores padrão:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

10

Você pode executar o Sample1 em outro bloco de anotações e definir o valor de entrada como 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

20