Orkestraties van Microsoft Agent Framework-werkstromen - opeenvolgend

Bij sequentiële orkestratie worden agenten geordend in een pijplijn. Elke agent verwerkt de taak op zijn beurt en geeft de uitvoer door aan de volgende agent in de reeks. Dit is ideaal voor werkstromen waarbij elke stap voortbouwt op de vorige stap, zoals documentbeoordeling, pijplijnen voor gegevensverwerking of redenering in meerdere fasen.

Sequentiële orkestratie

Belangrijk

Standaard verbruikt elke agent in de reeks het volledige gesprek van de vorige agent, zowel de invoerberichten die aan de vorige agent zijn verstrekt als de antwoordberichten. U kunt agents configureren om alleen de antwoordberichten van de vorige agent te gebruiken. Zie Beheren van context tussen agents voor meer informatie.

Wat u leert

  • Een sequentiële pijplijn van agenten maken
  • Agents koppelen waarbij elk van deze agents voortbouwt op de vorige uitvoer
  • Human-in-the-loop goedkeuring toevoegen voor aanroepen van gevoelige hulpprogramma's
  • Agents mixen met aangepaste executors voor gespecialiseerde taken
  • De gespreksstroom bijhouden via de pijplijn

Uw agents definiëren

In sequentiële indeling worden agents geordend in een pijplijn waarbij elke agent de taak op zijn beurt verwerkt en uitvoer doorgeeft aan de volgende agent in de reeks.

De Azure OpenAI-client instellen

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Waarschuwing

DefaultAzureCredential is handig voor ontwikkeling, maar vereist zorgvuldige overwegingen in de productieomgeving. Overweeg in productie een specifieke referentie te gebruiken (bijvoorbeeld ManagedIdentityCredential) om latentieproblemen, onbedoelde referentieprobing en potentiële beveiligingsrisico's van terugvalmechanismen te voorkomen.

Maak gespecialiseerde agents die op volgorde werken:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

De sequentiële orkestratie instellen

Bouw de werkstroom met behulp van AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

De sequentiële werkstroom uitvoeren

Voer de werkstroom uit en verwerkt de gebeurtenissen:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Voorbeelduitvoer

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Sequentiële indeling met Human-in-the-Loop

Sequentiële orkestraties ondersteunen menselijke tussenkomst bij interacties door middel van goedkeuring van hulpmiddelen. Wanneer agents tools gebruiken die zijn geïntegreerd met ApprovalRequiredAIFunction, wordt de workflow onderbroken en wordt er een RequestInfoEvent uitgezonden die een ToolApprovalRequestContent bevat. Externe systemen (zoals een menselijke operator) kunnen de aanroep van het hulpprogramma inspecteren, goedkeuren of afwijzen, en de werkstroom wordt dienovereenkomstig hervat.

Sequentiële indeling met Human-in-the-Loop

Aanbeveling

Zie Human-in-the-Loop voor meer informatie over het aanvraag- en antwoordmodel.

Agents definiëren met Approval-Required-hulpprogramma's

Maak agents waarbij gevoelige hulpprogramma's worden verpakt met ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Bouwen en uitvoeren met goedkeuringsafhandeling

Bouw normaal de opeenvolgende werkstroom. De goedkeuringsstroom wordt verwerkt via de gebeurtenisstroom:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Opmerking

AgentWorkflowBuilder.BuildSequential() ondersteunt standaard de goedkeuring van hulpprogramma's. Er is geen extra configuratie nodig. Wanneer een agent een hulpprogramma aanroept dat is omhuld met ApprovalRequiredAIFunction, onderbreekt de werkstroom automatisch en zendt een RequestInfoEvent uit.

Aanbeveling

Zie het GroupChatToolApproval voorbeeld voor een uitvoerbaar voorbeeld van deze goedkeuringsstroom. Hetzelfde afhandelingspatroon is van RequestInfoEvent toepassing op andere orkestraties.

Sleutelbegrippen

  • Sequentiële verwerking: elke agent verwerkt de uitvoer van de vorige agent in volgorde
  • AgentWorkflowBuilder.BuildSequential(): Hiermee maakt u een pijplijnwerkstroom op basis van een verzameling agents
  • ChatClientAgent: vertegenwoordigt een agent die wordt ondersteund door een chatclient met specifieke instructies
  • InProcessExecution.RunStreamingAsync(): Hiermee wordt de werkstroom uitgevoerd en wordt een StreamingRun streaming van realtime gebeurtenissen geretourneerd
  • Gebeurtenisafhandeling: de voortgang van de agent volgen via AgentResponseUpdateEvent en de voltooiing via WorkflowOutputEvent
  • Goedkeuring van hulpprogramma' s: gevoelige hulpprogramma's verpakken om ApprovalRequiredAIFunction menselijke goedkeuring te vereisen voordat de uitvoering wordt uitgevoerd
  • RequestInfoEvent: verzonden wanneer een hulpprogramma goedkeuring vereist; bevat ToolApprovalRequestContent met de details van de aanroep van het hulpprogramma

Bij sequentiële orkestratie verwerkt elke agent de taak op zijn beurt, waarbij de uitvoer van het ene naar het andere stroomt. Begin met het definiëren van agents voor een proces in twee fasen:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

De sequentiële orkestratie instellen

De SequentialBuilder klasse maakt een pijplijn waarin agents taken in volgorde verwerken. Elke agent ziet de volledige gespreksgeschiedenis en voegt hun antwoord toe:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

De sequentiële werkstroom uitvoeren

Voer de werkstroom uit en verzamel het eindresultaat. De terminaluitvoer bevat de AgentResponse antwoordberichten van de laatste agent:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Voorbeelduitvoer

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Geavanceerd: Agenten combineren met aangepaste executors

Sequentiële orkestratie ondersteunt het combineren van agenten met aangepaste uitvoeringsmechanismen voor gespecialiseerde verwerking. Dit is handig wanneer u aangepaste logica nodig hebt waarvoor geen LLM is vereist:

Een aangepaste uitvoerder definiëren

Opmerking

Wanneer een aangepaste uitvoerder een agent in de reeks volgt, ontvangt de handler een AgentExecutorResponse (omdat agents intern worden verpakt door AgentExecutor). Gebruik agent_response.full_conversation deze functie om toegang te krijgen tot de volledige gespreksgeschiedenis. Een aangepaste uitvoerder die als laatste deelnemer (afsluiter) wordt gebruikt, moet ctx.yield_output(AgentResponse(...)) aanroepen zodat de uitvoer de terminaluitvoer van de werkstroom wordt.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Een gemengde opeenvolgende werkstroom bouwen

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Voorbeelduitvoer met aangepaste uitvoerder

===== Final Summary =====
Summary -> users:1 assistants:1

Context tussen agenten beheersen

Standaard verbruikt elke agent in een SequentialBuilder werkstroom het volledige gesprek van de vorige agent (invoer- en antwoordberichten). Met de instelling chain_only_agent_responses=True worden alle agents in de reeks geconfigureerd om alleen de antwoordberichten van de vorige agent te gebruiken:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Dit is handig voor vertaalpijplijnen, progressieve verfijning en andere scenario's waarin elke agent zich uitsluitend moet richten op het transformeren van de output van de voorafgaande agent, zonder beïnvloed te worden door eerdere gespreksrondes.

Zie sequential_chain_only_agent_responses.py in de opslagplaats agentframework voor een volledig voorbeeld.

Aanbeveling

Zie Contextmodi in de verwijzing Agent Executor voor meer gedetailleerde controle over de contextstroom, inclusief aangepaste filterfuncties.

Tussenliggende uitvoer

Standaard wordt alleen de uitvoer van de laatste deelnemer als workflow-evenement output getoond. Stel intermediate_outputs=True in om de uitvoer van elke deelnemer te tonen, naast de uiteindelijke uitvoer.

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

U kunt deze gebeurtenissen in realtime afhandelen in de streamingmodus:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Sequentiële indeling met Human-in-the-Loop

Sequentiële indelingen ondersteunen interacties tussen mensen in de lus op twee manieren: goedkeuring van hulpprogramma's voor het beheren van gevoelige toolaanroepen en aanvraaginformatie voor het onderbreken van elk agentantwoord om feedback te verzamelen.

Sequentiële indeling met Human-in-the-Loop

Aanbeveling

Zie Human-in-the-Loop voor meer informatie over het aanvraag- en antwoordmodel.

Goedkeuring van hulpprogramma's in opeenvolgende werkstromen

Gebruik @tool(approval_mode="always_require") dit om hulpprogramma's te markeren die menselijke goedkeuring nodig hebben voordat ze worden uitgevoerd. De werkstroom onderbreekt en verzendt een request_info gebeurtenis wanneer de agent het hulpprogramma probeert aan te roepen.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

De gebeurtenisstroom verwerken en goedkeuringsaanvragen verwerken:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Aanbeveling

Zie sequential_builder_tool_approval.py voor een volledig voorbeeld dat kan worden uitgevoerd. Goedkeuring van tools werkt met SequentialBuilder zonder extra bouwerconfiguratie.

Informatie aanvragen voor feedback van de agent

Gebruik .with_request_info() om een pauze in te lassen nadat een specifieke agent reageert, zodat er externe input, zoals een menselijke beoordeling, kan plaatsvinden voordat de volgende agent begint.

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Aanbeveling

Bekijk de volledige voorbeelden: goedkeuring van sequentiële hulpprogramma's en informatie over sequentiële aanvragen.

Sleutelbegrippen

  • Gedeelde context: standaard gebruikt elke agent het volledige gesprek van de vorige agent, inclusief invoer- en antwoordberichten
  • Contextbeheer: hiermee chain_only_agent_responses=True configureert u agents om alleen de antwoordberichten van de vorige agent te gebruiken
  • Uitvoer van AgentResponse: de terminaluitvoer van de werkstroom bevat AgentResponse het antwoord van de laatste agent (niet het volledige gesprek)
  • Order Matters: Agents functioneren strikt in de volgorde die is opgegeven in de participants lijst
  • Flexibele deelnemers: U kunt agents en aangepaste uitvoerders in elke willekeurige volgorde combineren
  • Aangepast eindcontract: een aangepaste uitvoerder die als laatste deelnemer wordt gebruikt, moet aanroepen ctx.yield_output(AgentResponse(...)) om de terminaluitvoer te produceren
  • Tussenliggende uitvoer: configureer intermediate_outputs=True om de uitvoer van elke deelnemer weer te geven als een werkstroomevenement output niet alleen de laatste deelnemer
  • Goedkeuring van hulpprogramma: gebruiken @tool(approval_mode="always_require") voor gevoelige bewerkingen waarvoor menselijke beoordeling nodig is
  • Informatie aanvragen: gebruik .with_request_info(agents=[...]) deze functie om te onderbreken na specifieke agents voor externe feedback

Volgende stappen