次の方法で共有


Microsoft Agent Framework ワークフロー - ヒューマンインザループ (HITL)

このページでは、Microsoft Agent Framework ワークフロー システム での Human-in-the-loop (HITL) の相互作用の概要を示します。 HITL は、ワークフローの 要求と応答 の処理メカニズムによって実現されます。これにより、Executor は外部システム (人間のオペレーターなど) に要求を送信し、応答を待ってからワークフローの実行を続行できます。

概要

ワークフロー内の Executor は、ワークフローの外部に要求を送信し、応答を待機できます。 これは、Executor が人間とループ内の相互作用などの外部システムや他の非同期操作と対話する必要があるシナリオに役立ちます。

人間のオペレーターに数値の推測を求め、Executor を使用して推測が正しいかどうかを判断するワークフローを構築しましょう。

ワークフローで要求と応答の処理を有効にする

要求と応答は、 RequestPortと呼ばれる特殊な型を使用して処理されます。

RequestPortは、Executor が要求を送信して応答を受信できるようにする通信チャネルです。 Executor がメッセージを RequestPortに送信すると、要求ポートは要求の詳細を含む RequestInfoEvent を出力します。 外部システムは、これらのイベントをリッスンし、要求を処理し、応答をワークフローに送り返すことができます。 フレームワークは、元の要求に基づいて、応答を適切な Executor に自動的にルーティングします。

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

ワークフローに入力ポートを追加します。

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

JudgeExecutorの定義にはターゲット番号が必要であり、推測が正しいかどうかを判断できます。 正しくない場合は、 RequestPortを介して新しい推測を要求する別の要求が送信されます。

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

Python では、Executor は ctx.request_info() を使用して要求を送信し、 @response_handler デコレーターで応答を処理します。

人間のオペレーターに数値の推測を求め、Executor を使用して推測が正しいかどうかを判断するワークフローを構築しましょう。

ワークフローで要求と応答の処理を有効にする

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

@response_handlerデコレーターは、指定された要求と応答の種類の応答を処理するメソッドを自動的に登録します。 フレームワークは、 original_request パラメーターと response パラメーターの型注釈に基づいて、受信応答を正しいハンドラーに照合します。

要求と応答の処理

RequestPortは、要求を受信したときにRequestInfoEventを出力します。 これらのイベントをサブスクライブして、ワークフローからの受信要求を処理できます。 外部システムから応答を受け取ったら、応答メカニズムを使用してワークフローに送り返します。 フレームワークは、元の要求を送信した Executor に応答を自動的にルーティングします。

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

ヒント

完全な実行可能なプロジェクトの 完全なサンプル を参照してください。

Executor は、個別のコンポーネントを必要とせずに直接要求を送信できます。 Executor がctx.request_info()を呼び出すと、ワークフローはWorkflowEventを含むtype == "request_info"を生成します。 これらのイベントをサブスクライブして、ワークフローからの受信要求を処理できます。 外部システムから応答を受け取ったら、応答メカニズムを使用してワークフローに送り返します。 フレームワークは、Executor の @response_handler メソッドに応答を自動的にルーティングします。

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

ヒント

完全な実行可能ファイルについては、この 完全なサンプル を参照してください。

エージェント オーケストレーションにおける人間参加型システム

上記の RequestPort パターンは、カスタム Executor と WorkflowBuilder で動作します。 エージェント オーケストレーション (シーケンシャル、コンカレント、グループ チャット ワークフローなど) を使用する場合、ツールの承認は、人間のループ内要求/応答メカニズムによって実現されます。

エージェントは、実行前に人間の承認を必要とするツールを使用できます。 エージェントが承認が必要なツールを呼び出そうとすると、ワークフローは一時停止し、RequestInfoEvent パターンと同様にRequestPortを出力しますが、イベント ペイロードには、カスタム要求の種類ではなく、ToolApprovalRequestContent (C#) または Content (Python) を含むtype == "function_approval_request"が含まれます。

チェックポイントと要求

チェックポイントの詳細については、「 チェックポイント」を参照してください。

チェックポイントが作成されると、保留中の要求もチェックポイント状態の一部として保存されます。 チェックポイントから復元すると、保留中の要求が RequestInfoEvent オブジェクトとして再出力され、それらをキャプチャして応答できるようになります。 再開操作中に直接応答を提供することはできません。代わりに、再出力されたイベントをリッスンし、標準の応答メカニズムを使用して応答する必要があります。

次のステップ