ワークフローに AI エージェントを追加する場合は、ワークフロー エンジンがメッセージをルーティングし、セッション状態を管理し、その出力を処理できるように、そのエージェントを Executor にラップする必要があります。 エージェント Executor は、この適応を処理する組み込みの Executor です。
概要
エージェント Executor は、エージェントの抽象化とワークフロー実行モデルの間のギャップを埋めます。 それ:
- ワークフロー グラフから型指定されたメッセージを受信し、基になるエージェントに転送します。
- 実行間のエージェントのセッションと会話の状態を管理します。
- ワークフロー実行モード (ストリーミングまたは非ストリーミング) に基づいて動作を調整します。
- 監視のためにワークフロー呼び出し元に出力イベント (
AgentResponseまたはAgentResponseUpdate) を生成します。 - 接続されたダウンストリーム Executor にメッセージを送信して、グラフ内で処理を継続します。
- 実行時間の長いワークフローのチェックポイント処理をサポートします。
しくみ
C# では、ワークフロー エンジンは、ワークフローに追加される各AIAgentHostExecutorごとに内部的にAIAgentを作成します。 この特殊な Executor は、 ChatProtocolExecutor を拡張し、 ターン トークン パターンを使用します。
-
メッセージ キャッシュ — メッセージが他の Executor から到着すると、エージェント Executor によって収集されます。
ForwardIncomingMessagesが有効になっている場合 (既定値)、受信メッセージもダウンストリーム Executor に転送されます。 -
ターン トークン トリガー — エージェントは、
TurnTokenを受信した後にのみ、キャッシュされたメッセージを処理します。 -
エージェント呼び出し — Executor は、基になるエージェントの
RunAsync(非ストリーミング) またはRunStreamingAsync(ストリーミング) を呼び出します。 -
出力の生成 — ストリーミング イベントが有効になっている場合、各増分
AgentResponseUpdateはワークフロー出力として生成されます。EmitAgentResponseEventsが有効になっている場合、集計されたAgentResponseもワークフロー出力として生成されます。 - ダウンストリーム メッセージング — エージェントの応答メッセージは、接続されたダウンストリーム Executor に送信されます。
-
ターン トークンパススルー — ターン が完了すると、Executor は新しい
TurnTokenダウンストリームを送信して、チェーン内の次のエージェントが処理を開始できるようにします。
ヒント
一部のシナリオでは、より特殊なエージェント Executor が必要になる場合があります。たとえば、 ハンドオフ オーケストレーションでは 、カスタム ルーティング ロジックで専用の HandoffAgentExecutor が使用されます。
暗黙的な作成と明示的な作成
AIAgentにWorkflowBuilderを渡すと、フレームワークによって自動的にAIAgentBindingにラップされ、基になるAIAgentHostExecutorが作成されます。 エージェント Executor を直接インスタンス化する必要はありません。
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
また、一般的なパターンの AgentWorkflowBuilder でヘルパー メソッドを使用することもできます。
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
カスタム構成
エージェント Executor の動作をカスタマイズするには、BindAsExecutorでAIAgentHostOptionsを使用します。
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
入力の種類
C# のエージェント 実行プログラムは、 string、 ChatMessage、 IEnumerable<ChatMessage>の複数の入力の種類を受け入れます。 文字列入力は、ChatMessage ロールを持つUserインスタンスに自動的に変換されます。 すべての受信メッセージは、 TurnToken が受信されるまで蓄積され、その時点で Executor がバッチを処理します。
ReassignOtherAgentsAsUsersが有効になっている場合 (既定)、他のエージェントからのメッセージはUser ロールに再割り当てされるため、基になるモデルではユーザー入力として扱われますが、現在のエージェントからのメッセージはAssistant ロールを保持します。
出力とチェーン
エージェントのターンが完了すると、Executor は次の手順を実行します。
- 接続されているすべてのダウンストリーム 実行プログラムにエージェントの応答メッセージを送信します。
- チェーン内の次のエージェントが処理を開始できるように、新しい
TurnTokenを転送します。
これにより、エージェントをチェーンできるので、エッジで接続するだけです。
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
ストリーミング動作
ストリーミング動作は、EmitAgentUpdateEventsのAIAgentHostOptions オプションによって制御されるか、TurnTokenを介して動的に制御されます。
-
有効にすると 、Executor はエージェントで
RunStreamingAsyncを呼び出し、各AgentResponseUpdateをワークフロー出力イベントとして生成します。 これにより、トークンごとのリアルタイム更新が提供されます。 -
無効にすると 、Executor は
RunAsyncを呼び出し、1 つの完全な応答を生成します。
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
共有セッション
各エージェント実行プログラムは、既定で独自のセッションを維持します。 エージェント間でセッションを共有するには、エージェントをワークフローに追加する前に、共通のセッション プロバイダーでエージェントを構成します。
構成オプション
AIAgentHostOptions は、エージェント実行プログラムの動作を制御します。
| オプション | デフォルト | 説明 |
|---|---|---|
EmitAgentUpdateEvents |
null |
実行中にストリーミング更新イベントを出力します。
TurnToken が設定されている場合は優先されます。 両方が nullされている場合、ストリーミングは無効になります。 |
EmitAgentResponseEvents |
false |
集約されたエージェント応答をワークフロー出力イベントとして出力します。 |
InterceptUserInputRequests |
false |
UserInputRequestContentをインターセプトし、処理用のワークフロー メッセージとしてルーティングします。 |
InterceptUnterminatedFunctionCalls |
false |
対応する結果なしで FunctionCallContent をインターセプトし、ワークフロー メッセージとしてルーティングします。 |
ReassignOtherAgentsAsUsers |
true |
他のエージェントから User ロールにメッセージを再割り当てして、モデルがユーザー入力として扱うようにします。 |
ForwardIncomingMessages |
true |
エージェントによって生成されたメッセージの前に、受信メッセージをダウンストリーム Executor に転送します。 |
チェックポイント処理
エージェント Executor は、実行時間の長いワークフローのチェックポイント処理をサポートします。 チェックポイントが取得されると、Executor は次のコマンドをシリアル化します。
- エージェントのセッション状態 (
SerializeSessionAsync経由)。 - 現在のターンのイベント放出構成 (要求が保留中であり、Executor がまだ受信
TurnTokenを生成していない場合にのみ存在します)。 - 保留中のユーザー入力要求と関数呼び出し要求。
復元時に、Executor はセッションと保留中の要求状態を逆シリアル化し、中断したところからワークフローを再開できるようにします。
しくみ
AgentExecutor クラスは、SupportsAgentRun プロトコルを実装するエージェントをラップします。 Executor がメッセージを受信した場合:
-
メッセージの正規化 — 入力は
Messageオブジェクトのリストに正規化され、Executor の内部キャッシュに追加されます。 Executor は、キャッシュ前に入力を正規化する専用ハンドラーにルーティングされる複数の入力の種類 (str、Message、list[str | Message]、AgentExecutorRequest、およびAgentExecutorResponse) を受け入れます。 -
エージェント呼び出し — Executor はキャッシュされたメッセージで
agent.run()を呼び出し、ワークフロー実行モードに基づいてストリーミング モードまたは非ストリーミング モードを自動的に選択します。 -
出力出力 — ストリーミング モードでは、各
AgentResponseUpdateがワークフロー出力イベントとして生成されます。 非ストリーミング モードでは、1 つのAgentResponseが生成されます。 -
ダウンストリーム ディスパッチ — エージェントが完了すると、Executor は接続されているすべてのダウンストリーム Executor に
AgentExecutorResponseを送信します。 この応答には、全会話履歴が含まれるので、シームレスな連結ができるようになります。 - キャッシュのリセット — エージェントの呼び出し後に Executor の内部メッセージ キャッシュがクリアされ、各エージェント呼び出しで最後の呼び出し以降に受信した新しいメッセージのみが処理されるようにします。
ヒント
一部のシナリオでは、より特殊なエージェント Executor が必要になる場合があります。たとえば、 ハンドオフ オーケストレーションでは 、カスタム ルーティング ロジックを備えた専用の Executor が使用されます。
暗黙的な作成と明示的な作成
エージェントを直接渡すと、WorkflowBuilderはエージェントをAgentExecutorインスタンスで自動的にラップします。 ほとんどのワークフローでは、暗黙的な作成で十分です。
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
明示的な作成
必要に応じて、 AgentExecutor を明示的に作成します。
- 複数のエージェント間でセッションを共有します。
- ルーティング用のカスタム Executor ID とターゲット ランタイム kwargs を指定します。
- 複数のエッジで同じ Executor インスタンスを参照します。
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
コンストラクター パラメーター:
| パラメーター | タイプ | 説明 |
|---|---|---|
agent |
SupportsAgentRun |
包むエージェント。 |
session |
AgentSession \| None |
エージェントの実行に使用するセッション。
None場合は、エージェントから新しいセッションが作成されます。 |
id |
str \| None |
一意のエグゼキュータID。 既定では、エージェントの名前 (使用可能な場合) が設定されます。 |
ヒント
Executor ID は、個々のエージェントで workflow.run(function_invocation_kwargs=...) または client_kwargs= をターゲットにするときに使用されるキーでもあります。
idを省略すると、ワークフローはラップされたエージェントの名前を使用します。
入力の種類
AgentExecutorは複数のハンドラー メソッドを定義し、それぞれが異なる入力型を受け入れます。 ワークフロー エンジンは、メッセージの種類に基づいて正しいハンドラーを自動的にディスパッチします。
AgentExecutorRequest フラグがエージェントを実行するか、単にメッセージをキャッシュするかを制御するshould_respondを除き、すべての入力の種類によってエージェントがすぐに実行されます。
| 入力の種類 | ハンドラー | トリガーズエージェント | 説明 |
|---|---|---|---|
AgentExecutorRequest |
run |
Conditional | 標準的な入力のタイプ。 メッセージの一覧と、エージェントを実行するかどうかを制御する should_respond フラグが含まれます。 |
str |
from_str |
いつも | 生の文字列プロンプトを受け入れます。 |
Message |
from_message |
いつも | 1 つの Message オブジェクトを受け入れます。 |
list[str \| Message] |
from_messages |
いつも | 文字列または Message オブジェクトの一覧を会話コンテキストとして受け入れます。 |
AgentExecutorResponse |
from_response |
いつも | 以前のエージェント Executor の応答を受け入れ、ダイレクト チェーンを有効にします。 |
AgentExecutorRequest の使用
AgentExecutorRequest は正規の入力型であり、最も多くの制御を提供します。
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hello, world!"])],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
should_respond フラグは、エージェントがメッセージをすぐに処理するか、後でメッセージをキャッシュするかを制御します。
-
True(既定値) - エージェントが実行され、応答が生成されます。 -
False— メッセージはキャッシュに追加されますが、エージェントは実行されません。 これは、応答をトリガーする前に会話コンテキストを事前に読み込む場合に便利です。
出力とチェーン
エージェントが完了すると、Executor はダウンストリームに AgentExecutorResponse を送信します。 このデータクラスには次のものが含まれます。
| フィールド | タイプ | 説明 |
|---|---|---|
executor_id |
str |
応答を生成した Executor の ID。 |
agent_response |
AgentResponse |
基になるエージェントの応答 (クライアントから変更されていません)。 |
full_conversation |
list[Message] \| None |
チェーン用の完全な会話コンテキスト (以前の入力とエージェントの出力)。 |
エージェント実行プログラムをチェーンする場合、ダウンストリーム Executor は、AgentExecutorResponse ハンドラーを介してfrom_responseを受け取ります。
full_conversation フィールドを使用して完全な会話履歴を保持し、ダウンストリーム エージェントが以前のコンテキストを失うことを防ぎます。
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
ストリーミング動作
AgentExecutorは、ワークフロー実行モードに自動的に適応します。
-
stream=True—agent.run(stream=True)を呼び出し、各AgentResponseUpdateをワークフロー出力イベントとして生成します。 ストリーミングが完了すると、更新は集計されて完全なAgentResponseになり、ダウンストリームにディスパッチされます。 -
stream=False(既定値) -agent.run(stream=False)を呼び出し、ワークフロー出力イベントとして 1 つのAgentResponseを生成します。
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
共有セッション
既定では、各 AgentExecutor は独自のセッションを作成します。 複数のエージェント間でセッションを共有するには (たとえば、共通の会話スレッドを維持するために)、セッションを明示的に作成し、各 Executor に渡します。
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
注
すべてのエージェントが共有セッションをサポートしているわけではありません。 通常、セッションを共有できるのは、同じプロバイダーの種類のエージェントだけです。
チェックポイント処理
AgentExecutorでは、実行時間の長いワークフローで状態を保存および復元するためのチェックポイント処理がサポートされています。 チェックポイントが取得されると、Executor は次のコマンドをシリアル化します。
- 内部メッセージ キャッシュ。
- 完全な会話履歴。
- エージェント セッションの状態。
- 保留中のユーザー入力リクエストとその応答。
復元時に、Executor はこの状態を逆シリアル化し、中断したところからワークフローを再開できるようにします。
Warnung
サーバー側セッション ( FoundryAgent など) を使用するエージェントによるチェックポイント処理には制限があります。 サーバー側のセッション状態はチェックポイントにキャプチャされず、後続の実行で変更できます。 サーバー側セッションで信頼性の高いチェックポイント処理が必要な場合は、カスタム Executor の実装を検討してください。