Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A orquestração Magentic é projetada com base no sistema Magentic-One inventado pela AutoGen. É um padrão multiagente flexível e de uso geral projetado para tarefas complexas e abertas que exigem colaboração dinâmica. Nesse padrão, um gerente Magentic dedicado coordena uma equipe de agentes especializados, selecionando qual agente deve agir em seguida com base no contexto em evolução, no progresso da tarefa e nas capacidades do agente.
O gerente Magentic mantém um contexto compartilhado, acompanha o progresso e adapta o fluxo de trabalho em tempo real. Isso permite que o sistema quebre problemas complexos, delegue subtarefas e refine iterativamente soluções por meio da colaboração de agentes. A orquestração é especialmente adequada para cenários onde o caminho da solução não é conhecido antecipadamente e pode exigir múltiplas rondas de raciocínio, pesquisa e computação.
Sugestão
A orquestração Magentic tem a mesma arquitetura do padrão de orquestração do Group Chat , com um gestor muito poderoso que utiliza o planeamento para coordenar a colaboração entre agentes. Se o seu cenário exigir coordenação mais simples sem planeamento complexo, considere usar o padrão de Chat de Grupo em vez disso.
Observação
No artigo da Magentic-One , 4 agentes altamente especializados são concebidos para resolver um conjunto muito específico de tarefas. Na orquestração Magentic no Agent Framework, pode definir os seus próprios agentes especializados para se adequarem às necessidades específicas da sua aplicação. No entanto, não foi testado o quão bem a orquestração da Magentic funcionará fora do design original da Magentic-One.
O que você vai aprender
- Como configurar um gerente Magentic para coordenar vários agentes especializados
- Como gerir eventos de streaming com
WorkflowEvent - Como implementar a revisão de planos com intervenção humana
- Como acompanhar a colaboração do agente e o progresso em tarefas complexas
Defina seus agentes especializados
Na orquestração Magentic, você define agentes especializados que o gerente pode selecionar dinamicamente com base nos requisitos da tarefa:
#pragma warning disable MAAIW001 // Magentic types are experimental
#pragma warning disable OPENAI001 // HostedCodeInterpreterTool is experimental
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Specialized.Magentic;
using Microsoft.Extensions.AI;
string endpoint = Environment.GetEnvironmentVariable("AZURE_AI_PROJECT_ENDPOINT")
?? throw new InvalidOperationException("AZURE_AI_PROJECT_ENDPOINT is not set.");
string deploymentName = Environment.GetEnvironmentVariable("AZURE_AI_MODEL_DEPLOYMENT_NAME") ?? "gpt-5.4-mini";
AIProjectClient projectClient = new(new Uri(endpoint), new DefaultAzureCredential());
AIAgent researcherAgent = projectClient.AsAIAgent(
deploymentName,
name: "ResearcherAgent",
description: "Specialist in research and information gathering.",
instructions: "You are a researcher. Find relevant information without doing additional computation or quantitative analysis.");
AIAgent coderAgent = projectClient.AsAIAgent(
deploymentName,
name: "CoderAgent",
description: "A helpful assistant that writes and executes code to analyze data.",
instructions: "You solve quantitative questions by writing and running code. Show the analysis and the computation process clearly.",
tools: [new HostedCodeInterpreterTool()]);
AIAgent managerAgent = projectClient.AsAIAgent(
deploymentName,
name: "MagenticManager",
description: "Orchestrator that coordinates the research and coding workflow.",
instructions: "You coordinate the team to complete complex tasks efficiently.");
import os
from agent_framework import Agent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher_agent = Agent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
client=client,
)
coder_agent = Agent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
client=client,
tools=client.get_code_interpreter_tool(),
)
# Create a manager agent for orchestration
manager_agent = Agent(
name="MagenticManager",
description="Orchestrator that coordinates the research and coding workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
client=client,
)
Crie o fluxo de trabalho Magentic
Use o construtor de workflow Magentic para configurar o fluxo de trabalho com um gestor e um conjunto de participantes. O construtor também disponibiliza os limites do ciclo interno (número máximo de rondas de coordenação, número máximo de impasses consecutivos antes do replaneamento, número máximo de reinicializações do plano) e um indicador para revisão do plano com intervenção humana.
Workflow workflow = new MagenticWorkflowBuilder(managerAgent)
.AddParticipants([researcherAgent, coderAgent])
.WithName("Magentic Orchestration Workflow")
.WithDescription("Coordinates a researcher and coder to solve a complex analytical task.")
.RequirePlanSignoff(false)
.WithMaxRounds(10)
.WithMaxStalls(3)
.WithMaxResets(2)
.Build();
from agent_framework.orchestrations import MagenticBuilder
workflow = MagenticBuilder(
participants=[researcher_agent, coder_agent],
intermediate_output_from=[researcher_agent, coder_agent],
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
).build()
Sugestão
Um gestor padrão é implementado com base no design Magnetic-One, com prompts fixos extraídos do artigo original. Pode personalizar o comportamento do gestor passando os seus próprios prompts através dos MagenticBuilder parâmetros do construtor. Para personalizar ainda mais o gestor, pode também implementar o seu próprio gestor subclassificando a MagenticManagerBase classe.
Saídas Intermédias
Observação
Esta secção aplica-se atualmente apenas ao pivot em Python.
Passar intermediate_output_from=[...] para MagenticBuilder designa participantes específicos como fontes intermédias de saída. As suas yield_output chamadas emitem "intermediate" eventos, enquanto a resposta final sintetizada do gestor permanece um "output" evento (terminal). Sem este parâmetro (o padrão), apenas o terminal AgentResponse do gestor emerge.
Isto é particularmente útil para fluxos de trabalho Magnético porque:
- As tarefas são frequentemente de longa duração, com muitas rondas de colaboração entre agentes.
- Pode mostrar a contribuição de cada agente em tempo real à medida que o fluxo de trabalho avança em modo de streaming
- Proporciona visão sobre as etapas intermédias de raciocínio do fluxo de trabalho
Execute o Fluxo de Trabalho com Streaming de Eventos
Executar uma tarefa complexa e gerir eventos para saída em streaming e atualizações de orquestração. A saída do fluxo de trabalho do terminal contém a resposta final sintetizada do gestor.
const string TaskPrompt =
"I am preparing a report on the energy efficiency of different machine learning model architectures. " +
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 " +
"on standard datasets (for example, ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). " +
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 " +
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model " +
"per task type (image classification, text classification, and text generation).";
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(
workflow,
new List<ChatMessage> { new(ChatRole.User, TaskPrompt) });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastResponseId = null;
WorkflowOutputEvent? finalOutput = null;
await foreach (WorkflowEvent workflowEvent in run.WatchStreamAsync())
{
switch (workflowEvent)
{
case AgentResponseUpdateEvent updateEvent:
// Stream per-participant deltas. Group by ResponseId / MessageId / ExecutorId so
// each new contiguous response prints its executor header once.
string responseId = updateEvent.Update.ResponseId
?? updateEvent.Update.MessageId
?? updateEvent.ExecutorId;
if (!string.Equals(responseId, lastResponseId, StringComparison.Ordinal))
{
if (lastResponseId is not null)
{
Console.WriteLine();
}
Console.Write($"- {updateEvent.ExecutorId}: ");
lastResponseId = responseId;
}
Console.Write(updateEvent.Update.Text);
break;
case MagenticPlanCreatedEvent planCreated:
Console.WriteLine($"\n[Magentic Initial Plan]\n{planCreated.FullTaskLedger.Text}");
break;
case MagenticReplannedEvent replanned:
Console.WriteLine($"\n[Magentic Replanned]\n{replanned.FullTaskLedger.Text}");
break;
case MagenticProgressLedgerUpdatedEvent progressUpdated:
MagenticProgressLedger ledger = progressUpdated.ProgressLedger;
Console.WriteLine(
$"\n[Magentic Progress Ledger] satisfied={ledger.IsRequestSatisfied}, " +
$"inLoop={ledger.IsInLoop}, progressing={ledger.IsProgressBeingMade}, " +
$"nextSpeaker={ledger.NextSpeaker}, instruction={ledger.InstructionOrQuestion}");
break;
case WorkflowOutputEvent outputEvent when outputEvent.Is<List<ChatMessage>>():
finalOutput = outputEvent;
break;
case WorkflowErrorEvent workflowError:
Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error.");
break;
case ExecutorFailedEvent executorFailed:
Console.Error.WriteLine(
$"Executor '{executorFailed.ExecutorId}' failed: " +
(executorFailed.Data?.ToString() ?? "unknown error"));
break;
}
}
if (finalOutput?.As<List<ChatMessage>>() is { } transcript)
{
Console.WriteLine("\n\n=== Final Conversation Transcript ===\n");
foreach (ChatMessage message in transcript)
{
Console.WriteLine($"{message.AuthorName ?? message.Role.ToString()}: {message.Text}");
}
}
import json
import asyncio
from typing import cast
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
Message,
WorkflowEvent,
)
from agent_framework.orchestrations import MagenticProgressLedger
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (for example, ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
# Keep track of the last executor to format output nicely in streaming mode
last_message_id: str | None = None
final_response: AgentResponse | None = None
async for event in workflow.run(task, stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
message_id = event.data.message_id
if message_id != last_message_id:
if last_message_id is not None:
print("\n")
print(f"- {event.executor_id}:", end=" ", flush=True)
last_message_id = message_id
print(event.data, end="", flush=True)
elif event.type == "magentic_orchestrator":
print(f"\n[Magentic Orchestrator Event] Type: {event.data.event_type.name}")
if isinstance(event.data.content, Message):
print(f"Please review the plan:\n{event.data.content.text}")
elif isinstance(event.data.content, MagenticProgressLedger):
print(f"Please review progress ledger:\n{json.dumps(event.data.content.to_dict(), indent=2)}")
else:
print(f"Unknown data type in MagenticOrchestratorEvent: {type(event.data.content)}")
# Block to allow user to read the plan/progress before continuing
# Note: this is for demonstration only and is not the recommended way to handle human interaction.
# Please refer to `with_plan_review` for proper human interaction during planning phases.
await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to continue...")
elif event.type == "output" and isinstance(event.data, AgentResponse):
final_response = event.data
# The output of the Magentic workflow is an AgentResponse with the manager's final answer
if final_response:
output = final_response.messages[-1].text if final_response.messages else ""
print(output)
O Magentic destaca três eventos orquestradores que assinalam marcos de planeamento e progresso:
- Plano inicial criado — o gestor elaborou o plano inicial da tarefa.
- Replanificado — foi gerado um novo plano, quer devido à deteção de bloqueio, quer porque uma pessoa reviu o plano na revisão do plano.
- Registo de progresso atualizado — emitido uma vez por ronda de coordenação; transporta o registo de progresso atual (se o pedido está satisfeito, se a equipa está num ciclo, se o progresso está a ser feito, o próximo orador e a instrução a enviar-lhes).
Em Python, estes são transportados dentro de um único MagenticOrchestratorEvent cujo event_type enum distingue PLAN_CREATED, REPLANNED e PROGRESS_LEDGER_UPDATED. Em .NET são emitidos em três tipos distintos — MagenticPlanCreatedEvent, MagenticReplannedEvent e MagenticProgressLedgerUpdatedEvent — todos derivados de MagenticOrchestratorEvent.
Avançado: Revisão do plano Human-in-the-Loop
Permitir que o human-in-the-loop (HITL) permita aos utilizadores rever e aprovar o plano proposto pelo gestor antes da execução. Isto é útil para garantir que o plano está alinhado com as expectativas e requisitos dos utilizadores.
Existem duas opções para a revisão do plano:
- Revisão: O utilizador fornece feedback para rever o plano, o que leva o gestor a replanear com base no feedback.
- Aprovar: O utilizador aprova o plano as-is, permitindo que o fluxo de trabalho prossiga.
Ative a revisão de planos ao construir o fluxo de trabalho do Magentic. Os valores predefinidos diferem entre linguagens: em Python, a revisão do plano está desligada por defeito (enable_plan_review=False) e é necessário ativá-la explicitamente; no .NET, a revisão do plano está ativada por defeito (RequirePlanSignoff assume, por defeito, o valor true), e o exemplo básico apresentado anteriormente nesta página desativou-a para poder ser executado do início ao fim sem interação. O código abaixo mostra como optar por aderir e lidar com os pedidos de revisão resultantes.
As pausas na revisão do plano são reveladas através do mecanismo de pedido/resposta do fluxo de trabalho com MagenticPlanReviewRequest dados. Lidas com estes no fluxo de eventos e retomas o fluxo de trabalho com um MagenticPlanReviewResponse assim que a pessoa tiver aprovado ou revisto o plano.
Sugestão
Saiba mais sobre pedidos e respostas no guia de Pedidos e Respostas .
Workflow workflow = new MagenticWorkflowBuilder(managerAgent)
.AddParticipants([researcherAgent, coderAgent])
.RequirePlanSignoff(true)
.WithMaxRounds(10)
.WithMaxStalls(1)
.WithMaxResets(2)
.Build();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment environment = ExecutionEnvironment.InProcess_Lockstep
.ToWorkflowExecutionEnvironment()
.WithCheckpointing(checkpointManager);
await using StreamingRun run = await environment.OpenStreamingAsync(workflow);
await run.TrySendMessageAsync(new List<ChatMessage> { new(ChatRole.User, TaskPrompt) });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
ExternalRequest? pendingRequest = null;
CheckpointInfo? lastCheckpoint = null;
WorkflowOutputEvent? finalOutput = null;
async Task<WorkflowOutputEvent?> DrainAsync(StreamingRun activeRun)
{
WorkflowOutputEvent? output = null;
await foreach (WorkflowEvent evt in activeRun.WatchStreamAsync(blockOnPendingRequest: false))
{
switch (evt)
{
case AgentResponseUpdateEvent updateEvent:
Console.Write(updateEvent.Update.Text);
break;
case RequestInfoEvent requestInfo
when requestInfo.Request.Data.As<MagenticPlanReviewRequest>() is not null:
pendingRequest = requestInfo.Request;
break;
case SuperStepCompletedEvent stepCompleted:
lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint ?? lastCheckpoint;
break;
case WorkflowOutputEvent outputEvent when outputEvent.Is<List<ChatMessage>>():
output = outputEvent;
break;
}
}
return output;
}
finalOutput = await DrainAsync(run);
// Loop until the workflow finishes or the user accepts a plan that runs to completion.
while (finalOutput is null && pendingRequest is not null)
{
MagenticPlanReviewRequest reviewRequest = pendingRequest.Data.As<MagenticPlanReviewRequest>()!;
Console.WriteLine("\n\n[Magentic Plan Review Request]");
if (reviewRequest.CurrentProgress is { } progress)
{
Console.WriteLine(
$"Current progress: satisfied={progress.IsRequestSatisfied}, " +
$"inLoop={progress.IsInLoop}, progressing={progress.IsProgressBeingMade}");
}
if (reviewRequest.IsStalled)
{
Console.WriteLine("(Replan triggered by stall detection.)");
}
Console.WriteLine($"Proposed plan:\n{reviewRequest.Plan.Text}\n");
Console.Write("Press Enter to approve, or type feedback to request a revision: ");
string reply = Console.ReadLine() ?? string.Empty;
MagenticPlanReviewResponse reviewResponse = string.IsNullOrWhiteSpace(reply)
? reviewRequest.Approve()
: reviewRequest.Revise(reply);
ExternalResponse response = pendingRequest.CreateResponse(reviewResponse);
pendingRequest = null;
await using StreamingRun resumed = await environment.ResumeStreamingAsync(workflow, lastCheckpoint!);
await resumed.SendResponseAsync(response);
finalOutput = await DrainAsync(resumed);
}
if (finalOutput?.As<List<ChatMessage>>() is { } transcript)
{
Console.WriteLine("\n\n=== Final Conversation Transcript ===\n");
foreach (ChatMessage message in transcript)
{
Console.WriteLine($"{message.AuthorName ?? message.Role.ToString()}: {message.Text}");
}
}
import json
import asyncio
from typing import cast
from agent_framework import (
AgentResponseUpdate,
Agent,
Message,
WorkflowEvent,
)
from agent_framework.orchestrations import (
MagenticBuilder,
MagenticPlanReviewRequest,
MagenticPlanReviewResponse,
)
workflow = MagenticBuilder(
participants=[researcher_agent, coder_agent],
intermediate_output_from=[researcher_agent, coder_agent],
enable_plan_review=True,
manager_agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
).build()
pending_request: WorkflowEvent | None = None
pending_responses: dict[str, MagenticPlanReviewResponse] | None = None
final_response: AgentResponse | None = None
while not final_response:
if pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
else:
stream = workflow.run(task, stream=True)
last_message_id: str | None = None
async for event in stream:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
message_id = event.data.message_id
if message_id != last_message_id:
if last_message_id is not None:
print("\n")
print(f"- {event.executor_id}:", end=" ", flush=True)
last_message_id = message_id
print(event.data, end="", flush=True)
elif event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
pending_request = event
elif event.type == "output" and isinstance(event.data, AgentResponse):
final_response = event.data
pending_responses = None
# Handle plan review request if any
if pending_request is not None:
event_data = cast(MagenticPlanReviewRequest, pending_request.data)
print("\n\n[Magentic Plan Review Request]")
if event_data.current_progress is not None:
print("Current Progress Ledger:")
print(json.dumps(event_data.current_progress.to_dict(), indent=2))
print()
print(f"Proposed Plan:\n{event_data.plan.text}\n")
print("Please provide your feedback (press Enter to approve):")
reply = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
if reply.strip() == "":
print("Plan approved.\n")
pending_responses = {pending_request.request_id: event_data.approve()}
else:
print("Plan revised by human.\n")
pending_responses = {pending_request.request_id: event_data.revise(reply)}
pending_request = None
A MagenticPlanReviewRequest apresenta o plano proposto, o registo do progresso atual (null / None na revisão inicial e preenchido nos replaneamentos desencadeados por deteção de bloqueio) e um sinalizador que indica se o replaneamento foi desencadeado por deteção de bloqueio. Construa a resposta chamando approve() para aceitar o plano tal como está, ou revise(...) com feedback para pedir ao gestor que replaneie.
Conceitos-chave
- Coordenação Dinâmica: O gestor Magentic seleciona dinamicamente qual agente deve agir a seguir com base no contexto em evolução.
-
Terminal Output: A saída do fluxo de trabalho do terminal transporta a resposta final sintetizada do gestor (um
AgentResponseem Python; umWorkflowOutputEventcom uma carga útilList<ChatMessage>em .NET). -
Orchestrator Events: Os marcos criados pelo plano, replaneados e com o registo de progresso atualizado são expostos através de
MagenticOrchestratorEvent(um evento com uma enumeraçãoevent_typeem Python; três tipos derivados em .NET). Os deltas de transmissão por participante são fornecidos através dos eventos padrão de atualização "agent-response" da framework. - Refinamento Iterativo: O sistema pode decompor problemas complexos e refinar iterativamente soluções ao longo de várias rondas.
- Acompanhamento do Progresso e Deteção de Estalo: O livro de registo de progresso acompanha se o pedido está satisfeito, se a equipa está num ciclo e se o progresso está a ser feito. Rondas consecutivas sem progresso aumentam um contador de bloqueio e, quando o máximo configurado é excedido, é acionada uma reinicialização automática e um novo replaneamento.
- Colaboração Flexível: Os agentes podem ser chamados várias vezes em qualquer ordem, conforme determinado pelo gestor.
-
Supervisão Humana: Revisão opcional do plano com intervenção humana via
MagenticPlanReviewRequest/MagenticPlanReviewResponse. -
Saídas Intermédias (apenas Python, por agora): Designe participantes cujas chamadas
yield_outputdevem surgir como eventos"intermediate"juntamente com a saída terminal do gestor.
Fluxo de execução do trabalho
A orquestração Magentic segue este padrão de execução:
- Fase de planejamento: O gerente analisa a tarefa e cria um plano inicial
- Revisão Opcional do Plano: Se ativado, os humanos podem rever e aprovar/modificar o plano
- Seleção do agente: o gerente seleciona o agente mais apropriado para cada subtarefa
- Execução: O agente selecionado executa sua parte da tarefa
- Avaliação de progresso: O gestor avalia o progresso e atualiza o plano
- Deteção de Bloqueio: Se o progresso estagnar, replaneio automático com um processo opcional de revisão humana
- Iteração: Os passos 3-6 repetem-se até a tarefa estar concluída ou até serem atingidos limites
- Síntese Final: O gerente sintetiza todas as saídas do agente em um resultado final
Exemplo completo
Consulte os exemplos completos no repositório de Exemplos do Agent Framework.
Consulte os exemplos completos no repositório de Exemplos do Agent Framework.