NotebookUtils-körning och orkestrering

Använd anteckningsboksverktygen för att köra en anteckningsbok, köra flera anteckningsböcker parallellt eller avsluta en anteckningsbok med ett värde. Kör följande kommando för att få en översikt över tillgängliga metoder:

notebookutils.notebook.help()

I följande tabell visas de tillgängliga metoderna för körning och orkestrering av notebook-filer:

Metod Signature Beskrivning
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Kör en anteckningsbok och returnerar dess utgångsvärde.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Kör flera anteckningsböcker samtidigt med stöd för beroendesamband.
validateDAG validateDAG(dag: Any): bool Verifierar om en DAG-definition är korrekt strukturerad.
exit exit(value: str): None Avslutar den aktuella notebook-filen med ett värde.

För CRUD-åtgärder för notebook-filer (skapa, hämta, uppdatera, ta bort och lista), se Hantera notebook-artefakter.

Anmärkning

Parametern config i runMultiple() är endast tillgänglig i Python. Scala och R stöder inte den här parametern.

Anmärkning

Notebook-verktyg gäller inte för Apache Spark-jobbdefinitioner (SJD).

Referera till en anteckningsbok

Metoden run() refererar till en notebook-fil och returnerar dess slutvärde. Du kan köra kapslade funktionsanrop i en anteckningsbok interaktivt eller i en pipeline. Anteckningsboken som refereras körs på Sparkklustret i anteckningsboken som anropar den här funktionen.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Som exempel:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Returvärde

Metoden run() returnerar den exakta sträng som skickas till notebookutils.notebook.exit(value) i den underordnade anteckningsboken. Om exit() inte anropas i den underliggande anteckningsboken returneras en tom sträng ("").

Anteckningsböcker i Fabric har också stöd för att referera till anteckningsböcker mellan arbetsytor genom att specificera arbetsyte-ID:t.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Öppna ögonblicksbildens länk i cellutdata för att inspektera referenskörningen. Snapshoten samlar in körningsresultat och hjälper dig att felsöka den refererade notebook.

Skärmbild av resultatet av referenskörningen.

Skärmbild av ett exempel på en ögonblicksbild.

Konfigurera underordnade notebook-filer för att ta emot parametrar

När du skapar en underordnad notebook-fil som anropas via run() eller runMultiple()konfigurerar du en parametercell så att notebook-filen kan ta emot argument från den överordnade:

  1. Skapa en kodcell med standardparametervärden.
  2. Markera cellen som en parametercell genom att välja Markera cell som parametrar i notebook-användargränssnittet.
  3. Under exekveringen ersätts parametercellvärdena med argumenten som skickas från föräldern.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Tips/Råd

Slutvärden är alltid strängar. Om du behöver ett numeriskt värde i den överordnade notebook-filen konverterar du resultatet efter hämtningen (till exempel int(result)).

Överväganden

  • Referensanteckningsboken för gränsöverskridande arbetsytor stöds av version 1.2 och senare.
  • Om du använder filerna under Notebook-resurs använder du notebookutils.nbResPath i den refererade notebook-filen för att se till att den pekar på samma mapp som den interaktiva körningen.
  • Med referenskörning kan underordnade notebook-filer endast köras om de använder samma lakehouse som den överordnade, ärver den överordnades lakehouse eller om ingen av dem definierar ett lakehouse. Exekveringen blockeras om det underordnade anger ett annat lakehouse än den överordnade anteckningsboken. Om du vill kringgå den här kontrollen anger du useRootDefaultLakehouse: True argumenten.
  • Anropa inte notebookutils.notebook.exit(value) inne i ett try-catch block. Avslutsanropet har ingen effekt vid undantagshantering.

Referenskörning av flera notebook-filer parallellt

Använd notebookutils.notebook.runMultiple() för att köra flera notebook-filer parallellt eller i en fördefinierad topologisk struktur. API:et använder en multitrådad implementering i en Spark-session, vilket innebär att refererade notebooks delar datorkapacitet.

Med notebookutils.notebook.runMultiple()kan du:

  • Kör flera notebooks samtidigt, utan att behöva vänta på att var och en ska bli klar.

  • Ange beroenden och körningsordning för dina notebook-filer med hjälp av ett enkelt JSON-format.

  • Optimera användningen av Spark-beräkningsresurser och minska kostnaden för dina Fabric-projekt.

  • Visa ögonblicksbilder av utförandet av varje notebook i resultatet och felsök och övervaka notebook-uppgifter på ett bekvämt sätt.

  • Hämta slutvärdet för varje verkställande aktivitet och använd dem i underordnade uppgifter.

Kör notebookutils.notebook.help("runMultiple") för att visa fler exempel och användningsinformation.

Kör en enkel lista med notebook-filer

I följande exempel körs en lista över notebook-filer parallellt:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Körningsresultatet från huvudanteckningsboken är följande:

Skärmbild av en lista över anteckningsböcker.

Returvärde

Metoden runMultiple() returnerar en ordlista där varje nyckel är aktivitetsnamnet och varje värde är en ordlista med följande nycklar:

  • exitVal: Strängen som returnerades av den underordnade notebook-filens exit() anrop eller en tom sträng om exit() den inte anropades.
  • exception: Ett felobjekt om aktiviteten misslyckades eller None om den lyckades.

Köra notebook-filer med en DAG-struktur

I följande exempel körs notebook-filer i en DAG-struktur med hjälp av notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Körningsresultatet från huvudanteckningsboken är följande:

Skärmbild av en referenslista över anteckningsböcker med parametrar.

Referens för DAG-parametrar

I följande tabell beskrivs varje fält som du kan använda i DAG-definitionen:

Fält Nivå Obligatoriskt Beskrivning
activities Rot Ja En lista över aktivitetsobjekt som definierar vilka notebook-filer som ska köras.
timeoutInSeconds Rot No Maximal tidsgräns för hela DAG. Standardvärdet är 43200 (12 timmar).
concurrency Rot No Maximalt antal notebooks som ska köras samtidigt. Standardvärdet är 3 gånger det tillgängliga antalet processorkärnor. Ange det här värdet explicit om du behöver hårdare kontroll, eller använd 0 för obegränsad samtidighet.
name Activity Ja Ett unikt namn för aktiviteten. Används för att identifiera resultat och definiera beroenden.
path Activity Ja Namnet på anteckningsboksobjektet eller sökvägen som ska köras.
timeoutPerCellInSeconds Activity No Högsta timeout för varje cell i den underliggande notebook. Standardvärdet är 90 sekunder.
args Activity No En ordlista med parametrar som ska skickas till den underordnade notebook-filen.
workspace Activity No Arbetsytans namn eller ID där notebooken finns. Som standardinställning körs den underordnade notebooken på samma arbetsyta som den som anropar.
retry Activity No Antal återförsök om aktiviteten misslyckas. Standardvärdet är 0.
retryIntervalInSeconds Activity No Väntetid i sekunder mellan återförsök. Standardvärdet är 0.
dependencies Activity No En lista över aktivitetsnamn som måste slutföras innan den här aktiviteten startar.

Referensavslutsvärden mellan aktiviteter

Du kan referera till avslutsvärdet för en beroendeaktivitet i fältet args med hjälp av uttrycket @activity(). Med det här mönstret kan du skicka data mellan notebook-filer i en DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Tips/Råd

@activity('activity_name').exitValue() Använd uttrycket i fältet args för att skicka resultat från en aktivitet till en annan i en DAG.

Skapa en dynamisk DAG

Du kan generera DAG-strukturer programmatiskt för scenarier som utspridd bearbetning över flera partitioner:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Verifiera en DAG

Använd validateDAG() för att kontrollera att DAG-strukturen är giltig före körning. Den fångar upp problem som duplicerade aktivitetsnamn, saknade beroenden och cirkelreferenser.

notebookutils.notebook.validateDAG(DAG)

Returvärde

Metoden validateDAG() returnerar True om DAG-strukturen är giltig eller genererar ett undantag om verifieringen misslyckas.

Tips/Råd

Anropa validateDAG() alltid tidigare runMultiple() i produktionsarbetsflöden för att fånga upp strukturella fel tidigt.

Hantera runMultiple-fel

Metoden runMultiple() returnerar en ordlista där varje nyckel är aktivitetsnamnet och varje värde innehåller en exitVal (sträng) och ett exception (felobjekt eller None). Du kan granska partiella resultat även när vissa aktiviteter misslyckas:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Överväganden

  • Parallellitetsgraden för flera notebook-körningar är begränsad till den totala tillgängliga beräkningsresursen för en Spark-session.
  • Standardantal samtidiga notebooks är 3 gånger tillgängliga processorkärnor. Du kan anpassa det här värdet, men överdriven parallellitet kan leda till stabilitets- och prestandaproblem på grund av hög beräkningsresursanvändning. Om det uppstår problem kan du överväga att separera notebook-filer i flera runMultiple anrop eller minska samtidigheten genom att justera samtidighetsfältet i DAG-parametern.
  • Standardtimeouten för hela DAG är 12 timmar och standardtimeouten för varje cell i en underordnad notebook-fil är 90 sekunder. Du kan ändra tidsgränsen genom att ange fälten timeoutInSeconds och timeoutPerCellInSeconds i parametern dag.
  • Konfigurera retry och retryIntervalInSeconds för aktiviteter som kan misslyckas på grund av tillfälliga problem, till exempel tidsgränser för nätverk eller tillfällig tjänst som inte är tillgängliga.
  • Parallella notebooks delar beräkningsresurserna inom en enda Spark-session. Övervaka resursanvändningen för att undvika minnesbelastning och cpu-konkurrens.

Stäng en notebook

Metoden exit() avslutar en notebook-fil med ett värde. Du kan köra kapslade funktionsanrop i en anteckningsbok interaktivt eller i en pipeline.

  • När du anropar en exit() funktion från en notebook-fil interaktivt genererar Fabric-notebook-filen ett undantag, hoppar över efterföljande celler och håller Spark-sessionen vid liv.

  • När du orkestrerar en notebook-fil i en pipeline som anropar en exit()-funktion, returneras notebook-aktiviteten med ett slutvärde. Detta slutför pipelinekörningen och stoppar Spark-sessionen.

  • När du anropar en exit() funktion i en anteckningsbok som refereras, stoppar Fabric Spark ytterligare körning av den refererade anteckningsboken och fortsätter att köra nästa celler i huvudanteckningsboken, vilken anropar run() funktionen. Till exempel: Notebook1 har tre celler och anropar en exit() funktion i den andra cellen. Notebook2 har fem celler och anropar run(notebook1) i den tredje cellen. När du kör Notebook2 stoppas Notebook1 vid den andra cellen när du trycker på exit() funktionen. Notebook2 fortsätter att köra sin fjärde och femte cell.

notebookutils.notebook.exit("value string")

Returbeteende

Metoden exit() returnerar inte något värde. Den avslutar den aktuella notebook-filen och skickar den angivna strängen till den anropande notebook-filen eller pipelinen.

Anmärkning

Funktionen exit() skriver över den aktuella cellens utdata. Om du vill undvika att förlora utdata från andra kodinstruktioner anropar du notebookutils.notebook.exit() i en separat cell.

Viktigt!

Anropa inte notebookutils.notebook.exit() inne i ett try-catch block. Utgången har ingen effekt när den omsluts av undantagshantering. Anropet exit() måste vara på den översta nivån i koden för att fungera korrekt.

Som exempel:

Notebook-filen Sample1 har följande två celler:

  • Cell 1 definierar en indataparameter med standardvärdet inställt på 10.

  • Cell 2 lämnar notebooken med indata som utgångsvärde.

Skärmbild som visar en exempelanteckningsbok med avsluta-funktionen.

Du kan köra Sample1 i en annan notebook-fil med standardvärden:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Resultat:

10

Du kan köra Sample1 i en annan notebook-fil och ange indatavärdet som 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Resultat:

20