Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline. Chaque agent traite la tâche à son tour, en passant sa sortie à l’agent suivant dans la séquence. Cela est idéal pour les flux de travail où chaque étape s’appuie sur l’étape précédente, comme la révision de document, les pipelines de traitement des données ou le raisonnement à plusieurs étapes.
Important
Par défaut, chaque agent de la séquence utilise la conversation complète de l’agent précédent , à la fois les messages d’entrée fournis à l’agent précédent et ses messages de réponse. Vous pouvez configurer des agents pour qu’ils consomment uniquement les messages de réponse de l’agent précédent. Pour plus d’informations, consultez Contrôle du contexte entre les agents .
Ce que vous allez apprendre
- Comment créer un pipeline séquentiel d’agents
- Guide pratique pour chaîner des agents dans lesquels chacun s’appuie sur la sortie précédente
- Comment ajouter une approbation par un humain pour les appels d'outils critiques.
- Comment combiner des agents avec des exécuteurs personnalisés pour des tâches spécialisées
- Comment suivre le flux de conversation via le pipeline
Définir vos agents
Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline où chaque agent traite la tâche à son tour, en passant la sortie à l’agent suivant dans la séquence.
Configurer le client Azure OpenAI
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Avertissement
DefaultAzureCredential est pratique pour le développement, mais nécessite une considération minutieuse en production. En production, envisagez d’utiliser des informations d’identification spécifiques (par exemple ManagedIdentityCredential) pour éviter les problèmes de latence, la détection involontaire des informations d’identification et les risques de sécurité potentiels liés aux mécanismes de secours.
Créez des agents spécialisés qui fonctionneront en séquence :
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurer l’orchestration séquentielle
Créez le flux de travail en utilisant AgentWorkflowBuilder :
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Exécuter le flux de travail séquentiel
Exécutez le flux de travail et traitez les événements :
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Exemple de sortie
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Orchestration séquentielle avec Human-in-the-Loop
Les orchestrations séquentielles prennent en charge les interactions avec intervention humaine par l'approbation des outils. Lorsque les agents utilisent des outils encapsulés avec ApprovalRequiredAIFunction, le flux de travail se suspend et émet un RequestInfoEvent contenant un ToolApprovalRequestContent. Les systèmes externes (tels qu’un opérateur humain) peuvent inspecter l’appel de l’outil, l’approuver ou le rejeter, et le flux de travail reprend en conséquence.
Conseil / Astuce
Pour plus d’informations sur le modèle de demande et de réponse, consultez Human-in-the-Loop.
Définir des agents utilisant des outils qui nécessitent une approbation
Créez des agents dans lesquels les outils sensibles sont encapsulés avec ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Générer et exécuter avec la gestion des approbations
Générez normalement le flux de travail séquentiel. Le flux d’approbation est géré via le flux d’événements :
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Note
AgentWorkflowBuilder.BuildSequential() prend en charge l’approbation de l’outil d’emblée : aucune configuration supplémentaire n’est nécessaire. Lorsqu’un agent appelle un outil encapsulé avec ApprovalRequiredAIFunction, le flux de travail suspend et émet automatiquement un RequestInfoEvent.
Conseil / Astuce
Pour obtenir un exemple d’exécution complet de ce flux d’approbation, consultez l’exempleGroupChatToolApproval. Le même RequestInfoEvent modèle de gestion s’applique à d’autres orchestrations.
Concepts clés
- Traitement séquentiel : chaque agent traite la sortie de l’agent précédent dans l’ordre
- AgentWorkflowBuilder.BuildSequential() : crée un flux de travail de pipeline à partir d’une collection d’agents
- ChatClientAgent : représente un agent soutenu par un client de conversation avec des instructions spécifiques
-
InProcessExecution.RunStreamingAsync() : exécute le flux de travail et retourne une
StreamingRundiffusion en continu d’événements en temps réel -
Gestion des événements : surveiller la progression de l’agent via
AgentResponseUpdateEventet l’achèvement viaWorkflowOutputEvent -
Approbation de l’outil : Entourez les outils sensibles avec
ApprovalRequiredAIFunctionpour nécessiter une approbation humaine avant l’exécution. -
RequestInfoEvent : émis lorsqu’un outil requiert une approbation ; contient
ToolApprovalRequestContentles détails de l’appel de l’outil
Dans l’orchestration séquentielle, chaque agent traite la tâche à son tour, avec un résultat qui se transfère d’un à l’autre. Commencez par définir des agents pour un processus en deux étapes :
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Configurer l’orchestration séquentielle
La SequentialBuilder classe crée un pipeline dans lequel les agents traitent les tâches dans l’ordre. Chaque agent voit l’historique complet des conversations et ajoute sa réponse :
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Exécuter le flux de travail séquentiel
Exécutez le flux de travail et collectez la sortie finale. La sortie du terminal contient AgentResponse les messages de réponse du dernier agent :
from agent_framework import AgentResponse
# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()
if outputs:
print("===== Final Response =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"[{name}]\n{msg.text}")
Exemple de sortie
===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Avancé : Combinaison d’agents avec des exécuteurs personnalisés
L’orchestration séquentielle prend en charge la combinaison d’agents avec des exécuteurs personnalisés pour un traitement spécifique. Cela est utile lorsque vous avez besoin d’une logique personnalisée qui ne nécessite pas de LLM :
Définir un exécuteur personnalisé
Note
Lorsqu’un exécuteur personnalisé suit un agent dans la séquence, son gestionnaire reçoit un AgentExecutorResponse (car les agents sont encapsulés en interne par AgentExecutor). Permet agent_response.full_conversation d’accéder à l’historique complet des conversations. Un exécuteur personnalisé utilisé comme dernier participant (terminateur) doit appeler ctx.yield_output(AgentResponse(...)) afin que sa sortie devienne la sortie du terminal du flux de travail.
from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never
class Summarizer(Executor):
"""Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse]
) -> None:
if not agent_response.full_conversation:
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
Créer un flux de travail séquentiel mixte
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Exemple de sortie avec exécuteur personnalisé
===== Final Summary =====
Summary -> users:1 assistants:1
Contrôle du contexte entre les agents
Par défaut, chaque agent d’un SequentialBuilder flux de travail utilise la conversation complète de l’agent précédent (entrées + messages de réponse). Le paramètre chain_only_agent_responses=True configure tous les agents de la séquence pour qu’ils consomment uniquement les messages de réponse de l’agent précédent à la place :
workflow = SequentialBuilder(
participants=[writer, translator, reviewer],
chain_only_agent_responses=True,
).build()
Cela est utile pour les pipelines de traduction, l’affinement progressif et d’autres scénarios où chaque agent doit se concentrer uniquement sur la transformation de la sortie de l’agent précédent sans être influencé par des tours de conversation antérieurs.
Pour obtenir un exemple complet, consultez sequential_chain_only_agent_responses.py dans le référentiel Agent Framework.
Conseil / Astuce
Pour un contrôle plus précis sur le flux de contexte, y compris les fonctions de filtre personnalisées, consultez les modes de contexte dans la référence de l’exécuteur de l’agent.
Sorties intermédiaires
Par défaut, seule la sortie du dernier participant s’affiche en tant qu’événement de flux de travail output . Configurez intermediate_outputs=True pour afficher la sortie de chaque participant, en plus de la sortie finale.
workflow = SequentialBuilder(
participants=[writer, reviewer, editor],
intermediate_outputs=True,
).build()
Vous pouvez gérer ces événements en temps réel en mode streaming :
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Orchestration séquentielle avec Human-in-the-Loop
Les orchestrations séquentielles prennent en charge les interactions humaines dans la boucle de deux façons : approbation des outils pour contrôler les appels d’outils sensibles et demander des informations pour la suspension après chaque réponse de l’agent pour recueillir des commentaires.
Conseil / Astuce
Pour plus d’informations sur le modèle de demande et de réponse, consultez Human-in-the-Loop.
Approbation des outils dans les flux de travail séquentiels
Permet @tool(approval_mode="always_require") de marquer les outils qui ont besoin d’une approbation humaine avant l’exécution. Le flux de travail suspend et émet un request_info événement lorsque l’agent tente d’appeler l’outil.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Traitez le flux d’événements et gérez les demandes d’approbation :
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Conseil / Astuce
Pour obtenir un exemple exécutable complet, consultez sequential_builder_tool_approval.py. L'approbation de l'outil fonctionne avec SequentialBuilder sans configuration supplémentaire du constructeur.
Demander des informations sur les commentaires de l’agent
Utilisez .with_request_info() pour suspendre après que des agents spécifiques répondent, et permettre l’entrée externe (par exemple, la révision humaine) avant le début de l’agent suivant.
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Conseil / Astuce
Consultez les exemples complets : approbation de l’outil séquentiel et informations de demande séquentielle.
Concepts clés
- Contexte partagé : par défaut, chaque agent utilise la conversation complète de l’agent précédent, y compris les messages d’entrée et de réponse
-
Contrôle de contexte : permet
chain_only_agent_responses=Truede configurer des agents pour consommer uniquement les messages de réponse de l’agent précédents -
Sortie agentResponse : la sortie du terminal du flux de travail est une
AgentResponseréponse contenant la dernière réponse de l’agent (et non la conversation complète) -
Ordre important : les agents s’exécutent strictement dans l’ordre spécifié dans la
participantsliste - Participants flexibles : vous pouvez combiner des agents et des exécuteurs personnalisés dans n’importe quel ordre
-
Contrat terminateur personnalisé : exécuteur personnalisé utilisé comme dernier participant doit appeler
ctx.yield_output(AgentResponse(...))pour produire la sortie du terminal -
Sorties intermédiaires : définie
intermediate_outputs=Truepour exposer la sortie de chaque participant en tant qu’événement de flux de travailoutput, et pas seulement le dernier participant -
Approbation de l’outil : Utilisation
@tool(approval_mode="always_require")pour les opérations sensibles nécessitant une révision humaine -
Demander des informations : Utilisez
.with_request_info(agents=[...])pour mettre en pause après certains agents pour les retours externes