Freigeben über


Microsoft Agent Framework Workflow-Orchestrierungen - Sequenzielle

In sequenzieller Orchestrierung werden Agents in einer Pipeline organisiert. Jeder Agent verarbeitet die Aufgabe wiederum und übergibt die Ausgabe an den nächsten Agent in der Sequenz. Dies ist ideal für Workflows, bei denen jeder Schritt auf dem vorherigen aufbaut, z. B. Dokumentüberprüfung, Datenverarbeitungspipeline oder mehrstufiges Denken.

Sequenzielle Orchestrierung

Von Bedeutung

Der vollständige Unterhaltungsverlauf von vorhergehenden Agenten wird an den nächsten Agenten in der Sequenz übergeben. Jeder Agent kann alle vorherigen Nachrichten anzeigen, sodass kontextbezogene Verarbeitung möglich ist.

Sie lernen Folgendes

  • Erstellen einer sequenziellen Pipeline von Agenten
  • So verketten Sie Agenten, bei denen jeder auf der vorherigen Ausgabe aufbaut
  • So mischen Sie Agents mit benutzerdefinierten Executoren für spezialisierte Aufgaben
  • Nachverfolgen des Gesprächsverlaufs durch die Pipeline

Definieren Sie Ihre Agenten

In sequenzieller Orchestrierung werden Agents in einer Pipeline organisiert, in der jeder Agent die Aufgabe wiederum verarbeitet und die Ausgabe an den nächsten Agent in der Sequenz übergibt.

Einrichten des Azure OpenAI-Clients

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

Warnung

DefaultAzureCredential ist praktisch für die Entwicklung, erfordert aber sorgfältige Überlegungen in der Produktion. Berücksichtigen Sie in der Produktion die Verwendung bestimmter Anmeldeinformationen (z. B. ManagedIdentityCredential), um Latenzprobleme, unbeabsichtigte Abfragen von Anmeldeinformationen und potenzielle Sicherheitsrisiken durch Ausweichmechanismen zu vermeiden.

Erstellen Sie spezielle Agents, die nacheinander arbeiten:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Einrichten der sequenziellen Orchestrierung

Erstellen Sie den Workflow mithilfe von AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Ausführen des sequenziellen Workflows

Führen Sie den Workflow aus, und verarbeiten Sie die Ereignisse:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

Beispielausgabe

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Wichtige Konzepte

  • Sequenzielle Verarbeitung: Jeder Agent verarbeitet die Ausgabe des vorherigen Agents in der Reihenfolge
  • AgentWorkflowBuilder.BuildSequential(): Erstellt einen Pipelineworkflow aus einer Sammlung von Agents.
  • ChatClientAgent: Stellt einen Agent dar, der von einem Chatclient mit bestimmten Anweisungen unterstützt wird.
  • InProcessExecution.RunStreamingAsync(): Führt den Workflow aus und gibt einen StreamingRun für Echtzeitereignisstreaming zurück.
  • Ereignisbehandlung: Überwachungen des Agentenfortschritts durch AgentResponseUpdateEvent und Erfassung des Abschlusses durch WorkflowOutputEvent

Bei der sequenziellen Orchestrierung verarbeitet jeder Agent die Aufgabe nacheinander, wobei die Ausgabe von einem zum nächsten fließt. Definieren Sie zunächst Agents für einen zweistufigen Prozess:

import os
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureOpenAIResponsesClient
chat_client = AzureOpenAIResponsesClient(
    project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
    deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Einrichten der sequenziellen Orchestrierung

Die SequentialBuilder Klasse erstellt eine Pipeline, in der Agents Aufgaben in der Reihenfolge verarbeiten. Jeder Agent sieht den vollständigen Unterhaltungsverlauf und fügt seine Antwort hinzu:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Ausführen des sequenziellen Workflows

Führen Sie den Workflow aus, und erfassen Sie die endgültige Unterhaltung mit den Beiträgen der einzelnen Agenten:

from typing import Any, cast
from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output":
        outputs.append(cast(list[Message], event.data))

if outputs:
    print("===== Final Conversation =====")
    messages: list[Message] = outputs[-1]
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Beispielausgabe

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Erweitert: Mischen von Agents mit benutzerdefinierten Executoren

Sequenzielle Orchestrierung unterstützt gemischte Agenten mit benutzerdefinierten Ausführungs-Engines für die spezialisierte Verarbeitung. Dies ist nützlich, wenn Sie benutzerdefinierte Logik benötigen, für die keine LLM erforderlich ist:

Definieren eines benutzerdefinierten Executors

Hinweis

Wenn ein benutzerdefinierter Executor einem Agenten in der Sequenz folgt, empfängt sein Handler ein AgentExecutorResponse (da Agenten intern von AgentExecutor umschlossen werden). Verwenden Sie agent_response.full_conversation, um auf den vollständigen Unterhaltungsverlauf zuzugreifen.

from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.send_message(list(agent_response.full_conversation) + [summary])

Erstellen eines gemischten sequenziellen Workflows

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Beispielausgabe mit benutzerdefiniertem Executor

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Wichtige Konzepte

  • Freigegebener Kontext: Jeder Teilnehmer empfängt den vollständigen Unterhaltungsverlauf, einschließlich aller vorherigen Nachrichten
  • Order Matters: Agenten führen streng in der participants Liste angegebenen Reihenfolge aus
  • Flexible Teilnehmer: Sie können Agents und benutzerdefinierte Executoren in beliebiger Reihenfolge kombinieren.
  • Unterhaltungsfluss: Jeder Agent/Ausführender fügt der Unterhaltung etwas hinzu und baut einen vollständigen Dialog auf.

Nächste Schritte