Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este documento se proporciona información general sobre cómo usar flujos de trabajo como agentes en Microsoft Agent Framework.
Información general
A veces ha creado un flujo de trabajo sofisticado con varios agentes, ejecutores personalizados y lógica compleja, pero quiere usarlo igual que cualquier otro agente. Eso es exactamente lo que los agentes de flujo de trabajo le permiten hacer. Al envolver el flujo de trabajo como un Agent, puede interactuar con él mediante la misma API familiar que se usaría para un agente de chat simple.
Ventajas clave
- Interfaz unificada: interacción con flujos de trabajo complejos mediante la misma API que agentes simples
- Compatibilidad de API: integración de flujos de trabajo con sistemas existentes que admiten la interfaz del agente
- Composición: Use agentes de flujo de trabajo como bloques de construcción en sistemas de agentes más grandes o en otros flujos de trabajo.
- Administración de sesiones: aprovechar las sesiones del agente para el estado y reanudación de la conversación
- Compatibilidad con streaming: obtener actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo
Funcionamiento
Al convertir un flujo de trabajo en un agente:
- El flujo de trabajo se valida para asegurarse de que su ejecutor de inicio puede aceptar los tipos de entrada necesarios.
- Se crea una sesión para administrar el estado de la conversación.
- Los mensajes de entrada se enrutan al ejecutor de inicio del flujo de trabajo.
- Los eventos de flujo de trabajo se convierten en actualizaciones de respuesta del agente
- Las solicitudes de entrada externas (de
RequestInfoExecutor) se muestran como llamadas de función
Requisitos
Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe poder manejar IEnumerable<ChatMessage> como entrada. Esto se satisface automáticamente cuando se usan ejecutores basados en agente creados con AsAIAgent.
Creación de un agente de flujo de trabajo
Use el método de AsAIAgent() extensión para convertir cualquier flujo de trabajo compatible en un agente:
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");
// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
.AddEdge(researchAgent, writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
Parámetros de AsAIAgent
| Parámetro | Tipo | Description |
|---|---|---|
id |
string? |
Identificador único opcional para el agente. Se genera automáticamente si no se proporciona. |
name |
string? |
Nombre para mostrar opcional para el agente. |
description |
string? |
Descripción opcional del propósito del agente. |
executionEnvironment |
IWorkflowExecutionEnvironment? |
Entorno de ejecución opcional. El valor predeterminado es InProcessExecution.OffThread o InProcessExecution.Concurrent, según la configuración del flujo de trabajo. |
includeExceptionDetails |
bool |
Si true, incluye los mensajes de excepción en el contenido de error. Tiene como valor predeterminado false. |
includeWorkflowOutputsInResponse |
bool |
Si true transforma las salidas de flujo de trabajo en contenido en las respuestas del agente. Tiene como valor predeterminado false. |
Uso de agentes de flujo de trabajo
Creación de una sesión
Cada conversación con un agente de flujo de trabajo requiere una sesión para administrar el estado:
// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();
Ejecución sin streaming
Para casos de uso sencillos en los que desee la respuesta completa:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentResponse response = await workflowAgent.RunAsync(messages, session);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
Ejecución de streaming
Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
Control de solicitudes de entrada externas
Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante RequestInfoExecutor), estas solicitudes se muestran como llamadas de función en la respuesta del agente:
await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
Serialización y reanudación de sesión
Las sesiones del agente de flujo de trabajo se pueden serializar para la persistencia y reanudarse más adelante:
// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);
// Store serializedSession to your persistence layer...
// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
Console.Write(update.Text);
}
Requisitos
Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe ser capaz de controlar la entrada del mensaje. Esto se cumple automáticamente cuando se usan Agent o ejecutores basados en agente.
Creación de un agente de flujo de trabajo
Llame a as_agent() en cualquier flujo de trabajo compatible para convertirlo en un agente:
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
# Create your chat client and agents
client = FoundryChatClient(
project_endpoint="<your-endpoint>",
model="<your-deployment>",
credential=AzureCliCredential(),
)
researcher = client.as_agent(
name="Researcher",
instructions="Research and gather information on the given topic.",
)
writer = client.as_agent(
name="Writer",
instructions="Write clear, engaging content based on research.",
)
# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
parámetros de as_agent
| Parámetro | Tipo | Description |
|---|---|---|
name |
str | None |
Nombre para mostrar opcional para el agente. Se genera automáticamente si no se proporciona. |
Uso de agentes de flujo de trabajo
Creación de una sesión
Opcionalmente, puede crear una sesión para administrar el estado de la conversación en varios turnos:
# Create a new session for the conversation
session = await workflow_agent.create_session()
Nota:
Las sesiones son opcionales. Si no pasa un session a run(), el agente gestiona el estado internamente.
Si workflow.as_agent() se crea sin context_providers, el framework agrega un InMemoryHistoryProvider() por defecto para que el historial de varios turnos funcione automáticamente.
Si pasa context_providers explícitamente, esa lista se usa tal cual.
Ejecución sin streaming
Para casos de uso sencillos en los que desee la respuesta completa:
# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")
for message in response.messages:
print(f"{message.author_name}: {message.text}")
Ejecución de streaming
Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:
async for update in workflow_agent.run(
"Write an article about AI trends",
stream=True,
):
if update.text:
print(update.text, end="", flush=True)
Control de solicitudes de entrada externas
Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante request_info), estas solicitudes se muestran como llamadas de función en la respuesta del agente. La llamada de función usa el nombre WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
from agent_framework import Content, Message, WorkflowAgent
response = await workflow_agent.run("Process my request")
# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
for content in message.contents:
if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
human_review_function_call = content
Proporcionar respuestas a solicitudes pendientes
Para continuar la ejecución del flujo de trabajo después de una solicitud de entrada externa, cree un resultado de función y vuelva a enviarlo:
if human_review_function_call:
# Parse the request arguments
request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
human_review_function_call.arguments
)
# Create a response (your custom response type)
result_data = MyResponseType(approved=True, feedback="Looks good")
# Create the function call result
function_result = Content.from_function_result(
call_id=human_review_function_call.call_id,
result=result_data,
)
# Send the response back to continue the workflow
response = await workflow_agent.run(Message("tool", [function_result]))
Ejemplo completo
Este es un ejemplo completo que muestra un agente de flujo de trabajo con salida de streaming:
import asyncio
import os
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
# Create specialized agents
researcher = client.as_agent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
)
writer = client.as_agent(
name="Writer",
instructions="Write engaging content based on the research provided.",
)
reviewer = client.as_agent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
)
# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Run the workflow
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run(
"Write about quantum computing",
stream=True,
):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
Comprensión de la conversión de eventos
Cuando un flujo de trabajo se ejecuta como agente, los eventos de flujo de trabajo se convierten en respuestas del agente. El tipo de respuesta depende de cómo llame a run():
-
run(): devuelve un objetoAgentResponseque contiene el resultado completo una vez finalizado el flujo de trabajo. -
run(..., stream=True): devuelve un iterable asincrónico deAgentResponseUpdateobjetos a medida que se ejecuta el flujo de trabajo, lo que proporciona actualizaciones en tiempo real.
Durante la ejecución, los eventos de flujo de trabajo internos se asignan a las respuestas del agente de la siguiente manera:
| Evento de flujo de trabajo | Respuesta del agente |
|---|---|
event.type == "output" |
Pasado como AgentResponseUpdate (streaming) o agregado a AgentResponse (sin streaming) |
event.type == "request_info" |
Convertido a contenido de invocación de función mediante WorkflowAgent.REQUEST_INFO_FUNCTION_NAME |
| Otros eventos | Se omite (solo en el flujo de trabajo interno) |
Esta conversión le permite usar la interfaz del agente estándar mientras sigue teniendo acceso a información detallada del flujo de trabajo cuando sea necesario.
Casos de uso
1. Canalizaciones de agente complejas
Encapsular un flujo de trabajo de varios agentes como un único agente para su uso en aplicaciones:
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. Composición del agente
Use agentes de flujo de trabajo como componentes en sistemas más grandes:
- Otro agente puede usar un agente de flujo de trabajo como herramienta
- Se pueden organizar varios agentes de flujo de trabajo juntos
- Los agentes de flujo de trabajo se pueden anidar dentro de otros flujos de trabajo
3. Integración de API
Exponga flujos de trabajo complejos a través de APIs que esperan la interfaz del Agente estándar, lo que permite:
- Interfaces de chat que usan flujos de trabajo de back-end sofisticados
- Integración con sistemas existentes basados en agentes
- Migración gradual de agentes simples a flujos de trabajo complejos
Pasos siguientes
- Obtenga información sobre cómo controlar solicitudes y respuestas en flujos de trabajo
- Aprenda a administrar el estado en flujos de trabajo
- Obtenga información sobre cómo crear puntos de control y reanudarlos a partir de ellos
- Obtenga información sobre cómo supervisar los flujos de trabajo
- Más información sobre el aislamiento de estado en flujos de trabajo
- Obtenga información sobre cómo visualizar flujos de trabajo