次の方法で共有


NotebookUtils ノートブックの実行とオーケストレーション

ノートブック ユーティリティを使用して、ノートブックを実行したり、複数のノートブックを並列で実行したり、値を指定してノートブックを終了したりします。 次のコマンドを実行して、使用可能なメソッドの概要を取得します。

notebookutils.notebook.help()

次の表に、使用可能なノートブックの実行方法とオーケストレーション メソッドを示します。

メソッド 署名 説明
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str ノートブックを実行し、その終了値を返します。
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] 依存関係をサポートして複数のノートブックを同時に実行します。
validateDAG validateDAG(dag: Any): bool DAG 定義が正しく構造化されているかどうかを検証します。
exit exit(value: str): None 現在のノートブックを特定の値と共に終了します。

ノートブック CRUD 操作 (作成、取得、更新、削除、一覧表示) については、「 ノートブック成果物の管理」を参照してください。

configrunMultiple() パラメーターは Python でのみ使用できます。 Scala と R では、このパラメーターはサポートされていません。

Notebook ユーティリティは、Apache Spark ジョブ定義 (SJD) には適用されません。

ノートを参照する

run() メソッドはノートブックを参照し、その終了値を返します。 入れ子になった関数呼び出しは、ノートブックで対話的に、またはパイプライン内で実行できます。 参照されたNotebookは、この関数を呼び出すNotebookの Spark プールで実行されます。

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

例えば次が挙げられます。

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

値を返す

run() メソッドは、子ノートブックのnotebookutils.notebook.exit(value)に渡された正確な文字列を返します。 子ノートブックで exit() が呼び出されない場合は、空の文字列 ("") が返されます。

ファブリック ノートブックでは、ワークスペース ID を指定することで、ワークスペース間でのノートブックの参照もサポートされます。

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

セル出力でスナップショット リンクを開き、参照実行を調べます。 スナップショットは実行結果をキャプチャし、参照先のノートブックをデバッグするのに役立ちます。

参照実行の結果を示すスクリーンショット。

スナップショットの例を示すスクリーンショット。

パラメーターを受け取る子ノートブックを設定する

run()またはrunMultiple()を使用して呼び出される子ノートブックを作成する場合は、ノートブックが親から引数を受け取ることができるようにパラメーター セルを設定します。

  1. 既定のパラメーター値を持つコード セルを作成します。
  2. ノートブック UI で [セルをパラメーターとしてマーク] を選択して 、セルをパラメーター セルとしてマーク します。
  3. 実行中、パラメーター セルの値は、親から渡された引数に置き換えられます。
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

ヒント

終了値は常に文字列です。 親ノートブックに数値が必要な場合は、取得後に結果を変換します (たとえば、 int(result))。

考慮事項

  • ワークスペース間参照Notebookは、ランタイム バージョン 1.2 以降 でサポートされています。
  • [Notebook リソース]の下のファイルを使用する場合は、参照先のNotebookで notebookutils.nbResPath を使用して、対話型実行と同じフォルダーを指していることを確認します。
  • 参照実行を使用すると、子ノートブックは、親と同じレイクハウスを使用する場合、親のレイクハウスを継承する場合、またはどちらも定義していない場合にのみ実行できます。 子が親ノートブックとは異なるレイクハウスを指定した場合、実行はブロックされます。 このチェックをバイパスするには、引数に useRootDefaultLakehouse: True を設定します。
  • notebookutils.notebook.exit(value) ブロック内でtry-catchを呼び出さないでください。 例外処理でラップされた場合、終了呼び出しは有効になりません。

複数のノートブックを参照して並列実行する

notebookutils.notebook.runMultiple()を使用して、複数のノートブックを並列または定義済みのトポロジ構造で実行します。 この API は、Spark セッション内でマルチスレッド実装を使用します。つまり、参照されているノートブックの実行でコンピューティング リソースが共有されます。

notebookutils.notebook.runMultiple() を使用すると、以下のことができます。

  • 各Notebookが完了するのを待たずに、複数のNotebookを同時に実行することができます。

  • 単純な JSON 形式を使用して、Notebookの依存関係と実行順序を指定します。

  • Spark コンピューティング リソースの使用を最適化し、Fabric プロジェクトのコストを削減します。

  • 出力内の各Notebook実行レコードのスナップショットを表示し、Notebook タスクを容易にデバッグ/監視します。

  • 各エグゼクティブ アクティビティの終了値を取得し、ダウンストリーム タスクで使用します。

notebookutils.notebook.help("runMultiple")を実行して、その他の例と使用状況の詳細を表示します。

ノートブックの簡単な一覧を実行する

次の例では、ノートブックの一覧を並列で実行します。

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

ルート Notebookからの実行結果は次のとおりです。

ノートブックの一覧を参照するスクリーンショット。

値を返す

runMultiple() メソッドは、各キーがアクティビティ名であり、各値が次のキーを持つディクショナリであるディクショナリを返します。

  • exitVal: 子ノートブックの exit() 呼び出しによって返される文字列。または、 exit() が呼び出されなかった場合は空の文字列。
  • exception: アクティビティが失敗した場合はエラー オブジェクト、成功した場合は None

DAG 構造を使用してノートブックを実行する

次の例では、 notebookutils.notebook.runMultiple()を使用して DAG 構造でノートブックを実行します。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

ルート Notebookからの実行結果は次のとおりです。

パラメーターを含むノートブック一覧を参照するためのスクリーンショット。

DAG パラメーターリファレンス

次の表では、DAG 定義で使用できる各フィールドについて説明します。

フィールド レベル 必須 説明
activities はい 実行するノートブックを定義するアクティビティ オブジェクトの一覧。
timeoutInSeconds いいえ DAG 全体で許可される最大のタイムアウト。 既定値は 43200 (12 時間) です。
concurrency いいえ 同時に実行するノートブックの最大数。 既定値は、使用可能な CPU コア数の 3 倍です。 より厳密な制御が必要な場合は、この値を明示的に設定するか、 0 を使用して無制限のコンカレンシーを行います。
name アクティビティ はい アクティビティの一意の名前。 結果を識別し、依存関係を定義するために使用されます。
path アクティビティ はい 実行するノートブック項目の名前またはパス。
timeoutPerCellInSeconds アクティビティ いいえ 子ノートブック内の各セルの最大タイムアウト。 既定値は 90 秒です。
args アクティビティ いいえ 子ノートブックに渡すパラメーターのディクショナリ。
workspace アクティビティ いいえ ノートブックが存在するワークスペース名または ID。 既定では、子ノートブックは呼び出し元と同じワークスペースで実行されます。
retry アクティビティ いいえ アクティビティが失敗した場合の再試行回数。 既定値は 0 です。
retryIntervalInSeconds アクティビティ いいえ 再試行の間の待機時間 (秒単位)。 既定値は 0 です。
dependencies アクティビティ いいえ このアクティビティを開始する前に完了する必要があるアクティビティ名の一覧。

アクティビティ間の終了値の参照

args式を使用して、@activity() フィールドで依存関係アクティビティの終了値を参照できます。 このパターンでは、DAG 内のノートブック間でデータを渡すことができます。

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

ヒント

@activity('activity_name').exitValue() フィールドのargs式を使用して、DAG 内のあるアクティビティから別のアクティビティに結果を渡します。

動的 DAG を構築する

複数のパーティションにわたるファンアウト処理などのシナリオでは、DAG 構造をプログラムで生成できます。

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

DAG を検証する

validateDAG()を使用して、DAG 構造が実行前に有効であることを確認します。 重複するアクティビティ名、不足している依存関係、循環参照などの問題をキャッチします。

notebookutils.notebook.validateDAG(DAG)

値を返す

validateDAG() メソッドは、DAG 構造体が有効な場合はTrueを返し、検証に失敗した場合は例外を発生させます。

ヒント

運用環境のワークフローでvalidateDAG()する前に、常にrunMultiple()を呼び出して、構造エラーを早期にキャッチします。

runMultiple エラーの処理

runMultiple() メソッドはディクショナリを返します。各キーはアクティビティ名であり、各値にはexitVal (文字列) とexception (エラー オブジェクトまたはNone) が含まれています。 一部のアクティビティが失敗した場合でも、部分的な結果を検査できます。

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

考慮事項

  • 複数のNotebook実行での並列処理の次数は、Spark セッションの使用可能なコンピューティング リソースの合計に制限されます。
  • 同時実行ノートブックの既定の数は、 使用可能な CPU コア数の 3 倍です。 この値はカスタマイズできますが、過剰な並列処理は、コンピューティング リソースの使用率が高いため、安定性とパフォーマンスの問題につながる可能性があります。 問題が発生した場合は、ノートブックを複数のrunMultiple呼び出しに分離するか、DAG パラメーターのconcurrencyフィールドを調整してコンカレンシーを減らすことを検討してください。
  • DAG 全体の既定のタイムアウトは 12 時間で、子ノートブックの各セルの既定のタイムアウトは 90 秒です。 タイムアウトを変更するには、DAG パラメーターの timeoutInSeconds フィールドと timeoutPerCellInSeconds フィールドを設定します。
  • ネットワーク タイムアウトや一時的なサービスの使用不能などの一時的な問題が原因で失敗する可能性があるアクティビティの retryretryIntervalInSeconds を構成します。
  • 並列ノートブックは、1 つの Spark セッション内でコンピューティング リソースを共有します。 リソース使用率を監視して、メモリ不足と CPU の競合を回避します。

ノートブックを終了する

exit() メソッドは、ノートブックを終了して値を返します。 入れ子になった関数呼び出しは、ノートブックで対話的に、またはパイプライン内で実行できます。

  • ノートブックから exit() 関数を対話形式で呼び出すと、Fabric ノートブックは例外をスローし、後続のセルの実行をスキップし、Spark セッションを維持します。

  • exit()関数を呼び出すパイプラインでノートブックを調整すると、ノートブック アクティビティは終了値で返されます。 これでパイプラインの実行が完了し、Spark セッションが停止します。

  • 参照されているノートブックで exit() 関数を呼び出すと、Fabric Spark は参照先のノートブックのそれ以降の実行を停止し、 run() 関数を呼び出すメイン ノートブック内の次のセルを実行し続けます。 たとえば、Notebook1 には 3 つのセルがあり、 2 番目のセルで exit() 関数を呼び出します。 Notebook2 は 5 つのセルを持ち 、3 番目のセルに run(notebook1) を呼び出します。 Notebook2 を実行すると、notebook1 は、 exit() 関数を押すと 2 番目のセルで停止します。 Notebook2 は 4 番目のセルと 5 番目のセルを引き続き実行します。

notebookutils.notebook.exit("value string")

戻り動作

exit() メソッドは値を返しません。 現在のノートブックを終了し、指定された文字列を呼び出し元のノートブックまたはパイプラインに渡します。

exit()関数は、現在のセル出力を上書きします。 他のコード ステートメントの出力が失われないようにするには、別のセルで notebookutils.notebook.exit() を呼び出します。

Important

notebookutils.notebook.exit() ブロック内でtry-catchを呼び出さないでください。 例外処理で囲まれている場合、終了処理は実行されません。 exit()呼び出しが正常に動作するには、コードの最上位レベルにある必要があります。

例えば次が挙げられます。

Sample1 ノートブックには、次の 2 つのセルがあります。

  • セル 1 では、input パラメーターがデフォルト値 10 で定義されます。

  • セル 2 は、 入力 を終了値として使用してノートブックを終了します。

スクリーンショットは exit 関数のサンプルノートブックを示しています。

Sample1 は、既定値を使用して別のノートブックで実行できます。

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

アウトプット:

10

別のノートブックで Sample1 を実行し、 入力 値を 20 に設定できます。

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

アウトプット:

20