次の方法で共有


リセット可能なエグゼキューターズ

概要

ワークフロー内の Executor は、多くの場合、ステートフルです。たとえば、メッセージの蓄積、ターン数の追跡、中間結果のキャッシュなどです。 共有 Executor インスタンスを使用して複数の実行でワークフローを再利用すると、前回の実行の残り状態が後続の実行に漏れ、予期しない動作やデータの破損が発生する可能性があります。

IResettableExecutor インターフェイスは、実行間の内部状態をクリアする Executor のコントラクトを提供することで、これを解決します。 ワークフロー ランタイムは、実行の完了時に共有 Executor インスタンスに対して ResetAsync() を自動的に呼び出し、次の実行のためにクリーンなスレートを確保します。

問題

ワークフローの実行中にメッセージを収集する Executor について考えてみましょう。

internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }
}

この Executor がワークフロー実行間で共有されている場合、 _messages は前の実行のデータを保持します。 2 回目の実行では、それに属していない古いメッセージが表示されます。

IResettableExecutor インターフェイス

IResettableExecutor は、ワークフロー ランタイムが実行間で呼び出す 1 つのメソッドを定義します。

public interface IResettableExecutor
{
    ValueTask ResetAsync();
}

Executor がこのインターフェイスを実装すると、ランタイムは実行のたびに安全にリセットでき、古い状態なしでワークフローを再利用できます。

IResettableExecutor の実装

ステートフル Executor をリセット可能にするには、インターフェイスを実装し、 ResetAsync()のすべての変更可能な状態をクリアします。

internal sealed partial class AggregationExecutor()
    : Executor("AggregationExecutor"), IResettableExecutor
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }

    public ValueTask ResetAsync()
    {
        this._messages.Clear();
        return default;
    }
}

リセット可能な Executor を使用するワークフローの完全な作業例については、 WorkflowAsAnAgent サンプルを参照してください。

実装するタイミング

すべての Executor が IResettableExecutorを実装する必要があるわけではありません。 この決定ガイドを使用します。

シナリオ 実装しますか? 理由
Executor は変更可能な状態 (リスト、カウンター、キャッシュ) を持ち、実行間で共有されます はい ある実行からの状態が次の実行にリークする
Executor がステートレスである いいえ リセットするものはありません
Executor はワークフローごとに新しく作成されます (ファクトリ メソッドを使用) いいえ 各実行では、クリーンな状態の新しいインスタンスが取得されます
Executor がクロスラン共有可能 (declareCrossRunShareable: true) として宣言されている いいえ クロスランの共有可能な実行プログラムは、リセットせずに同時使用をサポートします

Warnung

ステートフルな共有 Executor が IResettableExecutorを実装していない場合、ワークフローを再利用すると InvalidOperationExceptionがスローされます。

"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."

ランタイムでの使用方法

ワークフロー ランタイムは、リセット ライフサイクルを自動的に管理します。 自分で ResetAsync() を呼び出す必要はありません。 シーケンスは次のとおりです。

  1. 取得された所有権 — ワークフロー実行が開始されると、ランタイムはワークフロー インスタンスの所有権を取得し、Executor がリセットする必要がある点をメモします。
  2. 実行 - Executor はメッセージを処理し、状態が蓄積される可能性があります。
  3. 所有権が解放される — 実行が完了 (または破棄) されると、ランタイムは所有権を解放し、ResetAsync()を実装するすべての共有 Executor インスタンスでIResettableExecutorを呼び出します。
  4. 再利用の準備 - リセットが成功した後、ワークフローを新しい実行に使用できます。

共有 Executor がリセットに失敗した場合 (インターフェイスが実装されていないため)、ワークフローは非再利用可能としてマークされ、後続の実行はスローされます。

状態分離との関係

IResettableExecutor は、 State Management で説明されているヘルパー メソッド パターンを補完します。 2 つの方法は、さまざまなニーズに対応します。

  • ヘルパー メソッド (実行ごとに新しいインスタンスを作成する) は、最も強力な分離の保証を提供し、既定のアプローチとして推奨されます。
  • IResettableExecutor は、実行全体で Executor インスタンスを共有する必要がある場合 (たとえば、Executor の構築にコストがかかる場合や、ワークフローがエージェントとして公開され、複数の呼び出しで再利用される場合など) に便利です。

シナリオに最も適したアプローチを選択します。 ほとんどのワークフローでは、ヘルパー メソッドで十分です。 インスタンスを共有することが意図的な設計上の選択である場合は、 IResettableExecutor を使用します。

この概念は Python には適用されません。 完全な状態を分離するには、独立した実行ごとに新しいワークフローと Executor インスタンスを構築します。 パターンと例については、「 State Management 」を参照してください。

次のステップ