次の方法で共有


Microsoft Agent Framework ワークフロー - チェックポイント

このページでは、Microsoft Agent Framework ワークフロー システムの チェックポイント の概要について説明します。

概要

チェックポイントを使用すると、実行中の特定のポイントでワークフローの状態を保存し、後でそれらのポイントから再開することができます。 この機能は、次のシナリオで特に役立ちます。

  • 失敗した場合の進行状況の損失を回避する、実行時間の長いワークフロー。
  • 実行を一時停止して後で再開する、実行時間の長いワークフロー。
  • 監査またはコンプライアンスの目的で定期的な状態の保存を必要とするワークフロー。
  • 異なる環境またはインスタンス間で移行する必要があるワークフロー。

チェックポイントはいつ作成されますか?

ワークフローは、コア概念に記載されているように、スーパーステップで実行されることを忘れないでください。 チェックポイントは、そのスーパーステップ内のすべての Executor が実行を完了した後、各スーパーステップの最後に作成されます。 チェックポイントは、次のようなワークフローの状態全体をキャプチャします。

  • すべてのエグゼキューターの現在の状態
  • 次のスーパーステップのワークフロー内のすべての保留中のメッセージ
  • 保留中の要求と応答
  • 共有状態

チェックポイントのキャプチャ

チェックポイント処理を有効にするには、ワークフローの実行時に CheckpointManager を指定する必要があります。 その後、チェックポイントには、 SuperStepCompletedEventまたは実行の Checkpoints プロパティを使用してアクセスできます。

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

チェックポイント処理を有効にするには、ワークフローの作成時に CheckpointStorage を指定する必要があります。 その後、ストレージを介してチェックポイントにアクセスできます。

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

チェックポイントからの再開

同じ実行で特定のチェックポイントからワークフローを直接再開できます。

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

同じワークフロー インスタンス上の特定のチェックポイントからワークフローを直接再開できます。

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

チェックポイントからのリハイドレート

または、チェックポイントから新しい実行インスタンスにワークフローをリハイドレートすることもできます。

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

または、チェックポイントから新しいワークフロー インスタンスをリハイドレートすることもできます。

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Executorのステートを保存する

Executor の状態がチェックポイントに確実にキャプチャされるようにするには、Executor が OnCheckpointingAsync メソッドをオーバーライドし、その状態をワークフロー コンテキストに保存する必要があります。

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

また、チェックポイントからの再開時に状態が正しく復元されるようにするには、Executor が OnCheckpointRestoredAsync メソッドをオーバーライドし、ワークフロー コンテキストからその状態を読み込む必要があります。

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Executor の状態がチェックポイントに確実にキャプチャされるようにするには、Executor が on_checkpoint_save メソッドをオーバーライドし、その状態をディクショナリとして返す必要があります。

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

また、チェックポイントから再開するときに状態が正しく復元されるようにするには、executor は on_checkpoint_restore メソッドをオーバーライドし、指定された状態ディクショナリからその状態を復元する必要があります。

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

セキュリティに関する考慮事項

Important

チェックポイント ストレージは信頼境界です。 組み込みのストレージ実装を使用する場合でも、カスタム実装を使用する場合でも、ストレージ バックエンドは信頼できるプライベート インフラストラクチャとして扱う必要があります。 信頼されていないソースまたは改ざんされた可能性のあるソースからチェックポイントを読み込むことはありません。 悪意のあるチェックポイントを読み込むと、任意のコードが実行される可能性があります。

チェックポイントに使用されるストレージの場所が適切にセキュリティで保護されていることを確認します。 チェックポイント データへの読み取りまたは書き込みアクセス権を持つのは、承認されたサービスとユーザーだけです。

Pickle シリアル化

FileCheckpointStorage では、Python の pickle モジュールを使用して、データクラス、datetime、カスタム オブジェクトなどの非 JSON ネイティブ状態をシリアル化します。 pickle.loads()逆シリアル化中に任意のコードを実行できるため、侵害されたチェックポイント ファイルが読み込まれると、悪意のあるコードが実行される可能性があります。 フレームワークによって実行される逆シリアル化後の型チェックでは、これを防ぐことはできません。

脅威モデルで pickle ベースのシリアル化が許可されていない場合は、 InMemoryCheckpointStorage を使用するか、代替のシリアル化戦略でカスタム CheckpointStorage を実装します。

ストレージの場所の責任

FileCheckpointStorage には明示的な storage_path パラメーターが必要です。既定のディレクトリはありません。 フレームワークはパス トラバーサル攻撃に対して検証しますが、ストレージ ディレクトリ自体 (ファイルのアクセス許可、保存時の暗号化、アクセス制御) をセキュリティで保護することは開発者の責任です。 チェックポイント ディレクトリへの読み取りまたは書き込みアクセス権を持つのは、承認されたプロセスだけです。

次のステップ