Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Met gelijktijdige orkestratie kunnen meerdere agents parallel aan dezelfde taak werken. Elke agent verwerkt de invoer onafhankelijk en de bijbehorende resultaten worden verzameld en samengevoegd. Deze benadering is zeer geschikt voor scenario's waarbij diverse perspectieven of oplossingen waardevol zijn, zoals brainstormen, ensembleredenering of stemsystemen.
Wat u leert
- Meerdere agents met verschillende expertise definiëren
- Deze agents indelen om gelijktijdig te werken aan één taak
- De resultaten verzamelen en verwerken
Bij gelijktijdige orkestratie werken meerdere agenten tegelijkertijd en onafhankelijk aan dezelfde taak, waardoor ze verschillende perspectieven op dezelfde invoer bieden.
De Azure OpenAI-client instellen
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Waarschuwing
DefaultAzureCredential is handig voor ontwikkeling, maar vereist zorgvuldige overwegingen in de productieomgeving. Overweeg in productie een specifieke referentie te gebruiken (bijvoorbeeld ManagedIdentityCredential) om latentieproblemen, onbedoelde referentieprobing en potentiële beveiligingsrisico's van terugvalmechanismen te voorkomen.
Uw agents definiëren
Maak meerdere gespecialiseerde agents die gelijktijdig aan dezelfde taak werken:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
De concurrerende orkestratie instellen
Bouw de werkstroom met behulp van AgentWorkflowBuilder om agents parallel uit te voeren.
// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
De gelijktijdige werkstroom uitvoeren en resultaten verzamelen
Voer de workflow uit en verwerk gebeurtenissen van alle agents die tegelijkertijd actief zijn.
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Voorbeelduitvoer
French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!
===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!
Sleutelbegrippen
- Parallelle uitvoering: alle agents verwerken de invoer tegelijkertijd en onafhankelijk
- AgentWorkflowBuilder.BuildConcurrent(): Hiermee maakt u een gelijktijdige werkstroom op basis van een verzameling agents
- Automatische aggregatie: resultaten van alle agents worden automatisch verzameld in het uiteindelijke resultaat
-
Gebeurtenisstreaming: realtime bewaking van de voortgang van de agent via
AgentResponseUpdateEvent - Diverse perspectieven: elke agent brengt zijn unieke expertise naar hetzelfde probleem
Agents zijn gespecialiseerde entiteiten die taken kunnen verwerken. De volgende code definieert drie agenten: een onderzoeksexpert, een marketingexpert en een juridische expert.
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
De concurrerende orkestratie instellen
Met de ConcurrentBuilder klasse kunt u een werkstroom maken om meerdere agents parallel uit te voeren. U geeft de lijst met agenten door als deelnemers.
from agent_framework.orchestrations import ConcurrentBuilder
# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
De gelijktijdige werkstroom uitvoeren en de resultaten verzamelen
De standaardaggregator produceert een enkele AgentResponse met één assistentbericht voor elke deelnemer:
from agent_framework import AgentResponse
# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Aggregated Results =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")
Voorbeelduitvoer
===== Final Aggregated Results =====
------------------------------------------------------------
[researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------
[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------
[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
Geavanceerd: Aangepaste Agent Executors
Gelijktijdige orkestratie ondersteunt aangepaste uitvoerders die agents verpakken met aanvullende logica. Dit is handig wanneer u meer controle nodig hebt over hoe agents worden geïnitialiseerd en hoe aanvragen worden verwerkt:
Aangepaste agentuitvoerders definiëren
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
Agent,
Executor,
WorkflowContext,
handler,
)
class ResearcherExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
self.agent = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
self.agent = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
Een werkstroom bouwen met aangepaste uitvoerders
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
Geavanceerd: Aangepaste Aggregator
Standaard worden met gelijktijdige orkestratie alle agentreacties samengevoegd in één AgentResponse met één assistentbericht per deelnemer. U kunt dit gedrag overschrijven met een aangepaste aggregator waarmee de resultaten op een specifieke manier worden verwerkt:
Een aangepaste Aggregator definiëren
from agent_framework import AgentExecutorResponse
# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
instructions=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
name="summarizer",
)
# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{r.executor_id}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
prompt = "\n\n".join(expert_sections)
response = await summarizer_agent.run(prompt)
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
Een werkstroom bouwen met aangepaste Aggregator
workflow = (
ConcurrentBuilder(participants=[researcher, marketer, legal])
.with_aggregator(summarize_results)
.build()
)
output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
if event.type == "output":
output = event.data
if output:
print("===== Final Consolidated Output =====")
print(output)
Voorbeelduitvoer Met Aangepaste Aggregator
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
Tussenliggende uitvoer
Standaard wordt alleen de uitvoer van de aggregator weergegeven als een werkstroom output gebeurtenis. Stel intermediate_outputs=True in om ook de afzonderlijke uitvoer van elke deelnemer zichtbaar te maken.
workflow = ConcurrentBuilder(
participants=[researcher, marketer, legal],
intermediate_outputs=True,
).build()
U kunt deze gebeurtenissen in realtime afhandelen in de streamingmodus:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Sleutelbegrippen
- Parallelle uitvoering: alle agents werken tegelijkertijd en onafhankelijk aan de taak
-
AgentResponse-uitvoer: de standaardaggregator levert één
AgentResponsemet één assistentbericht per deelnemer (geen gebruikersprompt inbegrepen) - Diverse perspectieven: elke agent brengt zijn unieke expertise naar hetzelfde probleem
- Flexibele deelnemers: U kunt agents rechtstreeks gebruiken of inpakken in aangepaste uitvoerders
- Aangepaste verwerking: de standaardaggregator overschrijven om resultaten op domeinspecifieke manieren te synthetiseren
-
Tussenliggende uitvoer: ingesteld
intermediate_outputs=Trueom de afzonderlijke uitvoer van elke deelnemer weer te geven, naast de uiteindelijke uitvoer van de aggregator