Flujos de trabajo de Microsoft Agent Framework: uso de flujos de trabajo como agentes

En este documento se proporciona información general sobre cómo usar flujos de trabajo como agentes en Microsoft Agent Framework.

Información general

A veces ha creado un flujo de trabajo sofisticado con varios agentes, ejecutores personalizados y lógica compleja, pero quiere usarlo igual que cualquier otro agente. Eso es exactamente lo que los agentes de flujo de trabajo le permiten hacer. Al envolver el flujo de trabajo como un Agent, puede interactuar con él mediante la misma API familiar que se usaría para un agente de chat simple.

Ventajas clave

  • Interfaz unificada: interacción con flujos de trabajo complejos mediante la misma API que agentes simples
  • Compatibilidad de API: integración de flujos de trabajo con sistemas existentes que admiten la interfaz del agente
  • Composición: Use agentes de flujo de trabajo como bloques de construcción en sistemas de agentes más grandes o en otros flujos de trabajo.
  • Administración de sesiones: aprovechar las sesiones del agente para el estado y reanudación de la conversación
  • Compatibilidad con streaming: obtener actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo

Funcionamiento

Al convertir un flujo de trabajo en un agente:

  1. El flujo de trabajo se valida para asegurarse de que su ejecutor de inicio puede aceptar los tipos de entrada necesarios.
  2. Se crea una sesión para administrar el estado de la conversación.
  3. Los mensajes de entrada se enrutan al ejecutor de inicio del flujo de trabajo.
  4. Los eventos de flujo de trabajo se convierten en actualizaciones de respuesta del agente
  5. Las solicitudes de entrada externas (de RequestInfoExecutor) se muestran como llamadas de función

Requisitos

Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe poder manejar IEnumerable<ChatMessage> como entrada. Esto se satisface automáticamente cuando se usan ejecutores basados en agente creados con AsAIAgent.

Creación de un agente de flujo de trabajo

Use el método de AsAIAgent() extensión para convertir cualquier flujo de trabajo compatible en un agente:

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");

// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
    .AddEdge(researchAgent, writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parámetros de AsAIAgent

Parámetro Tipo Description
id string? Identificador único opcional para el agente. Se genera automáticamente si no se proporciona.
name string? Nombre para mostrar opcional para el agente.
description string? Descripción opcional del propósito del agente.
executionEnvironment IWorkflowExecutionEnvironment? Entorno de ejecución opcional. El valor predeterminado es InProcessExecution.OffThread o InProcessExecution.Concurrent, según la configuración del flujo de trabajo.
includeExceptionDetails bool Si true, incluye los mensajes de excepción en el contenido de error. Tiene como valor predeterminado false.
includeWorkflowOutputsInResponse bool Si true transforma las salidas de flujo de trabajo en contenido en las respuestas del agente. Tiene como valor predeterminado false.

Uso de agentes de flujo de trabajo

Creación de una sesión

Cada conversación con un agente de flujo de trabajo requiere una sesión para administrar el estado:

// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();

Ejecución sin streaming

Para casos de uso sencillos en los que desee la respuesta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentResponse response = await workflowAgent.RunAsync(messages, session);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Ejecución de streaming

Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Control de solicitudes de entrada externas

Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante RequestInfoExecutor), estas solicitudes se muestran como llamadas de función en la respuesta del agente:

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialización y reanudación de sesión

Las sesiones del agente de flujo de trabajo se pueden serializar para la persistencia y reanudarse más adelante:

// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);

// Store serializedSession to your persistence layer...

// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
    Console.Write(update.Text);
}

Requisitos

Para usar un flujo de trabajo como agente, el ejecutor de inicio del flujo de trabajo debe ser capaz de controlar la entrada del mensaje. Esto se cumple automáticamente cuando se usan Agent o ejecutores basados en agente.

Creación de un agente de flujo de trabajo

Llame a as_agent() en cualquier flujo de trabajo compatible para convertirlo en un agente:

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential

# Create your chat client and agents
client = FoundryChatClient(
    project_endpoint="<your-endpoint>",
    model="<your-deployment>",
    credential=AzureCliCredential(),
)

researcher = client.as_agent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
)

writer = client.as_agent(
    name="Writer",
    instructions="Write clear, engaging content based on research.",
)

# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

parámetros de as_agent

Parámetro Tipo Description
name str | None Nombre para mostrar opcional para el agente. Se genera automáticamente si no se proporciona.

Uso de agentes de flujo de trabajo

Creación de una sesión

Opcionalmente, puede crear una sesión para administrar el estado de la conversación en varios turnos:

# Create a new session for the conversation
session = await workflow_agent.create_session()

Nota:

Las sesiones son opcionales. Si no pasa un session a run(), el agente gestiona el estado internamente. Si workflow.as_agent() se crea sin context_providers, el framework agrega un InMemoryHistoryProvider() por defecto para que el historial de varios turnos funcione automáticamente. Si pasa context_providers explícitamente, esa lista se usa tal cual.

Ejecución sin streaming

Para casos de uso sencillos en los que desee la respuesta completa:

# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Ejecución de streaming

Para las actualizaciones en tiempo real a medida que se ejecuta el flujo de trabajo:

async for update in workflow_agent.run(
    "Write an article about AI trends",
    stream=True,
):
    if update.text:
        print(update.text, end="", flush=True)

Control de solicitudes de entrada externas

Cuando un flujo de trabajo contiene ejecutores que solicitan entrada externa (mediante request_info), estas solicitudes se muestran como llamadas de función en la respuesta del agente. La llamada de función usa el nombre WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:

from agent_framework import Content, Message, WorkflowAgent

response = await workflow_agent.run("Process my request")

# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
    for content in message.contents:
        if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
            human_review_function_call = content

Proporcionar respuestas a solicitudes pendientes

Para continuar la ejecución del flujo de trabajo después de una solicitud de entrada externa, cree un resultado de función y vuelva a enviarlo:

if human_review_function_call:
    # Parse the request arguments
    request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
        human_review_function_call.arguments
    )

    # Create a response (your custom response type)
    result_data = MyResponseType(approved=True, feedback="Looks good")

    # Create the function call result
    function_result = Content.from_function_result(
        call_id=human_review_function_call.call_id,
        result=result_data,
    )

    # Send the response back to continue the workflow
    response = await workflow_agent.run(Message("tool", [function_result]))

Ejemplo completo

Este es un ejemplo completo que muestra un agente de flujo de trabajo con salida de streaming:

import asyncio
import os

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    client = FoundryChatClient(
        project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
        model=os.environ["FOUNDRY_MODEL"],
        credential=AzureCliCredential(),
    )

    # Create specialized agents
    researcher = client.as_agent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
    )

    writer = client.as_agent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
    )

    reviewer = client.as_agent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
    )

    # Build a sequential workflow
    workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Run the workflow
    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run(
        "Write about quantum computing",
        stream=True,
    ):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Comprensión de la conversión de eventos

Cuando un flujo de trabajo se ejecuta como agente, los eventos de flujo de trabajo se convierten en respuestas del agente. El tipo de respuesta depende de cómo llame a run():

  • run(): devuelve un objeto AgentResponse que contiene el resultado completo una vez finalizado el flujo de trabajo.
  • run(..., stream=True): devuelve un iterable asincrónico de AgentResponseUpdate objetos a medida que se ejecuta el flujo de trabajo, lo que proporciona actualizaciones en tiempo real.

Durante la ejecución, los eventos de flujo de trabajo internos se asignan a las respuestas del agente de la siguiente manera:

Evento de flujo de trabajo Respuesta del agente
event.type == "output" Pasado como AgentResponseUpdate (streaming) o agregado a AgentResponse (sin streaming)
event.type == "request_info" Convertido a contenido de invocación de función mediante WorkflowAgent.REQUEST_INFO_FUNCTION_NAME
Otros eventos Se omite (solo en el flujo de trabajo interno)

Esta conversión le permite usar la interfaz del agente estándar mientras sigue teniendo acceso a información detallada del flujo de trabajo cuando sea necesario.

Casos de uso

1. Canalizaciones de agente complejas

Encapsular un flujo de trabajo de varios agentes como un único agente para su uso en aplicaciones:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composición del agente

Use agentes de flujo de trabajo como componentes en sistemas más grandes:

  • Otro agente puede usar un agente de flujo de trabajo como herramienta
  • Se pueden organizar varios agentes de flujo de trabajo juntos
  • Los agentes de flujo de trabajo se pueden anidar dentro de otros flujos de trabajo

3. Integración de API

Exponga flujos de trabajo complejos a través de APIs que esperan la interfaz del Agente estándar, lo que permite:

  • Interfaces de chat que usan flujos de trabajo de back-end sofisticados
  • Integración con sistemas existentes basados en agentes
  • Migración gradual de agentes simples a flujos de trabajo complejos

Pasos siguientes