Det finns två allmänna metoder för att skapa agentiska program med AI:
-
Deterministiska arbetsflöden – Koden definierar kontrollflödet. Du skriver sekvensen med steg, förgrening, parallellitet och felhantering med hjälp av standardprogrammeringskonstruktioner. LLM utför arbete i varje steg men styr inte det övergripande flödet.
-
Agentstyrda arbetsflöden (agentloopar) – LLM styr kontrollflödet. Agenten bestämmer vilka verktyg som ska anropas, i vilken ordning och när uppgiften är klar. Du tillhandahåller verktyg och instruktioner, men agenten bestämmer exekveringsvägen.
Båda metoderna drar nytta av varaktig körning och kan implementeras med programmeringsmodellen Durable Task. Den här artikeln visar hur du skapar varje mönster med hjälp av kodexempel.
Tips/Råd
Dessa mönster överensstämmer med de agentiska arbetsflödesdesigner som beskrivs i Anthropics Building Effective Agents. Programmeringsmodellen Durable Task mappar naturligt till dessa mönster: orkestreringar definierar arbetsflödeskontrollflödet och kontrolleras automatiskt, medan aktiviteter omsluter icke-deterministiska åtgärder som LLM-anrop, verktygsanrop och API-begäranden.
Välj en metod
Följande tabell hjälper dig att bestämma när du ska använda varje metod.
| Använd deterministiska arbetsflöden när... |
Använd agentloopar när... |
| Sekvensen med steg är känd i förväg. |
Uppgiften är öppen och stegen kan inte förutsägas. |
| Du behöver explicita skyddsräcken för agentbeteende. |
Du vill att LLM ska bestämma vilka verktyg som ska användas och när. |
| Efterlevnad eller granskning kräver granskningsbart kontrollflöde. |
Agenten måste anpassa sin metod baserat på mellanliggande resultat. |
| Du vill kombinera flera AI-ramverk i ett enda arbetsflöde. |
Du skapar en konversationsagent med möjligheter att använda verktyg. |
Båda metoderna tillhandahåller automatiska kontrollpunkter, återsökningsprinciper, distribuerad skalning och stöd för människa i processen genom varaktig körning.
Deterministiska arbetsflödesmönster
I ett deterministiskt arbetsflöde är det koden som styr exekveringsvägen. LLM anropas som ett steg i arbetsflödet men bestämmer inte vad som händer härnäst. Programmeringsmodellen Durable Task mappar naturligt till den här metoden:
-
Orkestreringar definierar arbetsflödeskontrollflödet (sekvens, förgrening, parallellitet, felhantering) och kontrolleras automatiskt.
-
Aktiviteter omsluter icke-deterministiska åtgärder som LLM-anrop, verktygsanrop och API-begäranden. Aktiviteter kan köras på valfri tillgänglig beräkningsinstans.
I följande exempel används Durable Functions som körs på Azure Functions med serverlös värd.
I följande exempel används portabla Durable Task SDK:er, som körs på valfri värdberäkning, inklusive Azure Container Apps, Kubernetes, virtuella datorer eller lokalt.
Promptkedjning
Kedjning av promptar är det enklaste agentiska mönstret. Du delar upp en komplex uppgift i en serie sekventiella LLM-interaktioner, där varje stegs utdata matas in i nästa stegs indata. Eftersom varje aktivitetsanrop automatiskt kontrollpunktas tvingar en krasch halvvägs genom pipelinen dig inte att starta om från början och återanvända dyra LLM-token – körningen återupptas från det senaste slutförda steget.
Du kan också infoga programmatiska valideringsportar mellan stegen. När du till exempel har genererat en disposition kan du kontrollera att den uppfyller en längd- eller ämnesbegränsning innan du skickar den till utkaststeget.
Det här mönstret mappar direkt till funktionslänkningsmönstret i programmeringsmodellen Durable Task.
När du ska använda: Pipelines för innehållsgenerering, dokumentbearbetning i flera steg, sekventiell databerikning, arbetsflöden som kräver mellanliggande valideringsportar.
[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var topic = context.GetInput<string>();
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje await-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
@app.orchestration_trigger(context_name="context")
def prompt_chaining_orchestration(context: df.DurableOrchestrationContext):
topic = context.get_input()
# Step 1: Generate research outline
outline = yield context.call_activity("generate_outline_agent", topic)
# Step 2: Write first draft from outline
draft = yield context.call_activity("write_draft_agent", outline)
# Step 3: Refine and polish the draft
final_content = yield context.call_activity("refine_draft_agent", draft)
return final_content
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje yield-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
const df = require("durable-functions");
df.app.orchestration("promptChainingOrchestration", function* (context) {
const topic = context.df.getInput();
// Step 1: Generate research outline
const outline = yield context.df.callActivity("generateOutlineAgent", topic);
// Step 2: Write first draft from outline
const draft = yield context.df.callActivity("writeDraftAgent", outline);
// Step 3: Refine and polish the draft
const finalContent = yield context.df.callActivity("refineDraftAgent", draft);
return finalContent;
});
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje yield-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
@FunctionName("PromptChainingOrchestration")
public String promptChainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
return finalContent;
}
Anmärkning
Orkestreringens tillstånd styrs automatiskt vid varje await() anrop. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, string topic)
{
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
}
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje await-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
def prompt_chaining_orchestration(ctx: task.OrchestrationContext, topic: str) -> str:
# Step 1: Generate research outline
outline = yield ctx.call_activity(generate_outline_agent, input=topic)
# Step 2: Write first draft from outline
draft = yield ctx.call_activity(write_draft_agent, input=outline)
# Step 3: Refine and polish the draft
final_content = yield ctx.call_activity(refine_draft_agent, input=draft)
return final_content
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje yield-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
const promptChainingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, topic: string): any {
// Step 1: Generate research outline
const outline: string = yield ctx.callActivity(generateOutlineAgent, topic);
// Step 2: Write first draft from outline
const draft: string = yield ctx.callActivity(writeDraftAgent, outline);
// Step 3: Refine and polish the draft
const finalContent: string = yield ctx.callActivity(refineDraftAgent, draft);
return finalContent;
};
Anmärkning
Orkestreringens tillstånd kontrolleras automatiskt vid varje yield-instruktion. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
ctx -> {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
ctx.complete(finalContent);
}
Anmärkning
Orkestreringens tillstånd styrs automatiskt vid varje await() anrop. Om värdprocessen kraschar eller om den virtuella datorn återanvänds återupptas orkestreringen automatiskt från det senaste slutförda steget i stället för att starta om.
Routing
Routning använder ett klassificeringssteg för att avgöra vilken nedströmsagent eller modell som ska hantera en begäran. Orkestreringen anropar först en klassificeraraktivitet och sedan grenar till lämplig hanterare baserat på resultatet. Med den här metoden kan du skräddarsy varje hanterares fråga, modell och verktyg oberoende av varandra, till exempel genom att dirigera faktureringsfrågor till en specialiserad agent med åtkomst till betalnings-API:er samtidigt som du skickar allmänna frågor till en modell med lägre vikt.
När du ska använda: Kundsupporttriage, avsiktsklassificering för specialiserade agenter, dynamisk modellval baserat på uppgiftskomplexitet.
[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<SupportRequest>();
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
"general" => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
@app.orchestration_trigger(context_name="context")
def routing_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Classify the request type
category = yield context.call_activity("classify_request_agent", request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield context.call_activity("billing_agent", request))
elif category == "technical":
return (yield context.call_activity("technical_support_agent", request))
else:
return (yield context.call_activity("general_inquiry_agent", request))
const df = require("durable-functions");
df.app.orchestration("routingOrchestration", function* (context) {
const request = context.df.getInput();
// Classify the request type
const category = yield context.df.callActivity("classifyRequestAgent", request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield context.df.callActivity("billingAgent", request);
case "technical":
return yield context.df.callActivity("technicalSupportAgent", request);
default:
return yield context.df.callActivity("generalInquiryAgent", request);
}
});
@FunctionName("RoutingOrchestration")
public String routingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
return switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, SupportRequest request)
{
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
}
def routing_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Classify the request type
category = yield ctx.call_activity(classify_request_agent, input=request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield ctx.call_activity(billing_agent, input=request))
elif category == "technical":
return (yield ctx.call_activity(technical_support_agent, input=request))
else:
return (yield ctx.call_activity(general_inquiry_agent, input=request))
const routingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: SupportRequest): any {
// Classify the request type
const category: string = yield ctx.callActivity(classifyRequestAgent, request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield ctx.callActivity(billingAgent, request);
case "technical":
return yield ctx.callActivity(technicalSupportAgent, request);
default:
return yield ctx.callActivity(generalInquiryAgent, request);
}
};
ctx -> {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
String result = switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
ctx.complete(result);
}
Parallellisering
När du har flera oberoende underaktiviteter kan du skicka dem som parallella aktivitetsanrop och vänta på alla resultat innan du fortsätter. Durable Task Scheduler distribuerar dessa aktiviteter automatiskt över alla tillgängliga beräkningsinstanser, vilket innebär att lägga till fler arbetare direkt minskar den totala tiden för wall-clock.
En vanlig variant är röstning med flera modeller: du skickar samma fråga till flera modeller (eller samma modell med olika temperaturer) parallellt och sammanställer eller väljer sedan bland svaren. Eftersom varje parallell gren är oberoende kontrollpunkt påverkas inte de andra av ett tillfälligt fel i en gren.
Det här mönstret mappar direkt till fan-out/fan-in-mönstret i Durable Task.
När du ska använda: Batchanalys av dokument, parallella verktygsanrop, utvärdering av flera modeller, innehållsmoderering med flera granskare.
[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
@app.orchestration_trigger(context_name="context")
def parallel_research_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
context.call_activity("research_subtopic_agent", subtopic)
)
research_results = yield context.task_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": research_results
})
return summary
const df = require("durable-functions");
df.app.orchestration("parallelResearchOrchestration", function* (context) {
const request = context.df.getInput();
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
context.df.callActivity("researchSubtopicAgent", subtopic)
);
const researchResults = yield context.df.Task.all(tasks);
// Aggregate: synthesize all research into a single summary
const summary = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: researchResults,
});
return summary;
});
@FunctionName("ParallelResearchOrchestration")
public String parallelResearchOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
}
def parallel_research_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
ctx.call_activity(research_subtopic_agent, input=subtopic)
)
research_results = yield task.when_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": research_results
})
return summary
const parallelResearchOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext,
request: { topic: string; subtopics: string[] }): any {
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
ctx.callActivity(researchSubtopicAgent, subtopic)
);
const researchResults: string[] = yield whenAll(tasks);
// Aggregate: synthesize all research into a single summary
const summary: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: researchResults,
});
return summary;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
ctx.complete(summary);
}
Orchestrator-arbetare
I det här mönstret anropar en central orkestrerare först en LLM (via en aktivitet) för att planera arbetet. Baserat på LLM:s utdata avgör orkestratorn sedan vilka underaktiviteter som behövs. Orchestrator skickar sedan dessa underaktiviteter till specialiserade arbetarorkestreringar. Den viktigaste skillnaden från parallelism är att uppsättningen med underaktiviteter inte har åtgärdats vid designtillfället. Orkestratorn bestämmer dem dynamiskt vid körning.
Det här mönstret använder underorkestreringar, som är oberoende kontrollpunktsbaserade underordnade arbetsflöden. Varje arbetsorkestrering kan själv innehålla flera steg, återförsök och kapslad parallellitet.
När du ska använda: Djupgående forskningspipelines, kodning av agentarbetsflöden som ändrar flera filer, samarbete med flera agenter där varje agent har en distinkt roll.
[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
@app.orchestration_trigger(context_name="context")
def orchestrator_workers_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Central orchestrator: determine what research is needed
subtasks = yield context.call_activity("plan_research_agent", request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
context.call_sub_orchestrator("research_worker_orchestration", subtask)
)
results = yield context.task_all(worker_tasks)
# Synthesize results
final_report = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": results
})
return final_report
const df = require("durable-functions");
df.app.orchestration("orchestratorWorkersOrchestration", function* (context) {
const request = context.df.getInput();
// Central orchestrator: determine what research is needed
const subtasks = yield context.df.callActivity("planResearchAgent", request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
context.df.callSubOrchestrator("researchWorkerOrchestration", subtask)
);
const results = yield context.df.Task.all(workerTasks);
// Synthesize results
const finalReport = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: results,
});
return finalReport;
});
@FunctionName("OrchestratorWorkersOrchestration")
public String orchestratorWorkersOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
}
def orchestrator_workers_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Central orchestrator: determine what research is needed
subtasks = yield ctx.call_activity(plan_research_agent, input=request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
ctx.call_sub_orchestrator(research_worker_orchestration, input=subtask)
)
results = yield task.when_all(worker_tasks)
# Synthesize results
final_report = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": results
})
return final_report
const orchestratorWorkersOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ResearchRequest): any {
// Central orchestrator: determine what research is needed
const subtasks: string[] = yield ctx.callActivity(planResearchAgent, request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
ctx.callSubOrchestrator(researchWorkerOrchestration, subtask)
);
const results: string[] = yield whenAll(workerTasks);
// Synthesize results
const finalReport: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: results,
});
return finalReport;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
ctx.complete(finalReport);
}
Utvärderare-optimerare
Mönstret evaluator-optimizer parar ihop en generatoragent med en utvärderaragent i en förfiningsloop. Generatorn producerar utdata, utvärderaren poängsätter den mot kvalitetskriterier och ger feedback och loopen upprepas tills utdata passerar eller ett maximalt iterationsantal har uppnåtts. Eftersom varje loop-iteration är kontrollpunktsäkrad, kommer en krasch efter tre lyckade förfiningsrundor inte att förlora förloppet.
Det här mönstret är särskilt användbart när kvaliteten kan mätas programmatiskt, till exempel genom att verifiera den genererade koden eller att en översättning bevarar namngivna entiteter.
När du ska använda: Kodgenerering med automatiserad granskning, litterär översättning, iterativ innehållsförfining, komplexa sökuppgifter som kräver flera analysrundor.
[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ContentRequest>();
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
@app.orchestration_trigger(context_name="context")
def evaluator_optimizer_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield context.call_activity("generate_content_agent", {
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield context.call_activity("evaluate_content_agent", content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const df = require("durable-functions");
df.app.orchestration("evaluatorOptimizerOrchestration", function* (context) {
const request = context.df.getInput();
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield context.df.callActivity("generateContentAgent", {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield context.df.callActivity("evaluateContentAgent", content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
});
@FunctionName("EvaluatorOptimizerOrchestration")
public String evaluatorOptimizerOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
return content;
}
feedback = evaluation.getFeedback();
}
return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ContentRequest request)
{
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
}
def evaluator_optimizer_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield ctx.call_activity(generate_content_agent, input={
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield ctx.call_activity(evaluate_content_agent, input=content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const evaluatorOptimizerOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ContentRequest): any {
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield ctx.callActivity(generateContentAgent, {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield ctx.callActivity(evaluateContentAgent, content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
};
ctx -> {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
ctx.complete(content);
return;
}
feedback = evaluation.getFeedback();
}
ctx.complete(content); // Return best effort after max iterations
}
Agentloopar
I en typisk AI-agentimplementering anropas en LLM i en loop, anropar verktyg och fattar beslut tills uppgiften har slutförts eller ett stoppvillkor har nåtts. Till skillnad från deterministiska arbetsflöden är körningssökvägen inte fördefinierad. Agenten avgör vad du ska göra i varje steg baserat på resultat från föregående steg.
Agentslingor passar bra för uppgifter där antalet eller stegens ordning inte kan förutsägas. Vanliga exempel är öppna kodningsagenter, autonom forskning och konversationsrobotar med funktioner för verktygssamtal.
Det finns två rekommenderade metoder för att implementera agentloopar med programmeringsmodellen Durable Task:
| Tillvägagångssätt |
Beskrivning |
När det bör användas |
|
Orkestreringsbaserad |
Skriv agentloopen som en varaktig orkestrering. Verktygsanrop implementeras som aktiviteter och mänskliga indata använder externa händelser. Orkestreringen styr loopens struktur, medan LLM fattar besluten inom den. |
Du behöver detaljerad kontroll över loopen, principer för återförsök per verktyg, distribuerad belastningsutjämning av verktygsanrop eller möjlighet att felsöka loopen i din IDE med brytpunkter. |
|
Entitetsbaserad |
Varje agentinstans är en varaktig entitet. Agentramverket hanterar loopen internt och entiteten ger varaktigt tillstånd och sessionspersistence. |
Du använder ett agentramverk (till exempel Microsoft Agent Framework) som redan implementerar agentloopen och du vill lägga till hållbarhet med minimala kodändringar. |
Orkestreringsbaserade agentloopar
En orkestreringsbaserad agentloop kombinerar flera durable task-funktioner: eviga orkestreringar (fortsätt som nya) för att hålla minnet begränsat, fläkta ut/fläkta in för parallell verktygskörning och externa händelser för interaktioner mellan människor i loopen. Varje iteration av loopen:
- Skickar den aktuella konversationskontexten till LLM via en aktivitet eller tillståndskänslig entitet.
- Tar emot LLM:s svar, som kan innehålla verktygsanrop.
- Kör alla verktygsanrop som aktiviteter (distribuerade över tillgänglig beräkning).
- Det är valfritt att vänta på mänsklig indata med hjälp av externa händelser.
- Fortsätter loopen med uppdaterat tillstånd eller slutförs när agenten signalerar att den är klar.
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get state from input (supports continue-as-new)
var state = context.GetInput<AgentState>() ?? new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them in parallel
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!; // Orchestration will restart with updated state
}
}
return "Max iterations reached.";
}
@app.orchestration_trigger(context_name="context")
def agent_loop_orchestration(context: df.DurableOrchestrationContext):
# Get state from input (supports continue-as-new)
state = context.get_input() or {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield context.call_activity("call_llm_agent", state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
context.call_activity("execute_tool", tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield context.task_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield context.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
context.continue_as_new(state)
return
return "Max iterations reached."
const df = require("durable-functions");
df.app.orchestration("agentLoopOrchestration", function* (context) {
// Get state from input (supports continue-as-new)
const state = context.df.getInput() || { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield context.df.callActivity("callLlmAgent", state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc) =>
context.df.callActivity("executeTool", tc)
);
const toolResults = yield context.df.Task.all(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput = yield context.df.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
context.df.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
});
@FunctionName("AgentLoopOrchestration")
public String agentLoopOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get state from input (supports continue-as-new)
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
return llmResponse.getFinalAnswer();
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return null;
}
}
return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, AgentState? state)
{
state ??= new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!;
}
}
return "Max iterations reached.";
}
}
def agent_loop_orchestration(ctx: task.OrchestrationContext, state: dict | None) -> str:
if state is None:
state = {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield ctx.call_activity(call_llm_agent, input=state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
ctx.call_activity(execute_tool, input=tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield task.when_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield ctx.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
ctx.continue_as_new(state)
return
return "Max iterations reached."
const agentLoopOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, state: AgentState | null): any {
if (!state) state = { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield ctx.callActivity(callLlmAgent, state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc: any) =>
ctx.callActivity(executeTool, tc)
);
const toolResults = yield whenAll(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput: string = yield ctx.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
ctx.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
};
ctx -> {
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
ctx.complete(llmResponse.getFinalAnswer());
return;
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return;
}
}
ctx.complete("Max iterations reached.");
}
Entitetsbaserade agentloopar
Om du använder ett agentramverk som redan implementerar en egen agentloop kan du omsluta det i en hållbar entitet för att lägga till hållbarhet utan att skriva om looplogiken. Varje entitetsinstans representerar en enskild agentsession. Entiteten tar emot meddelanden, delegerar till agentramverket internt och bevarar konversationstillståndet mellan interaktioner.
Den största fördelen med den här metoden är enkelhet: du skriver din agent med ditt önskade ramverk och lägger till hållbarhet som ett värdproblem i stället för att designa om agentens kontrollflöde. Entiteten fungerar som ett varaktigt omslag och hanterar sessionsbeständighet och återställning automatiskt.
I följande exempel visas hur du omsluter en befintlig agent-SDK som en varaktig entitet. Entiteten exponerar en message funktion som klienter anropar för att skicka användarinmatning. Internt delegerar entiteten till agentramverket, som hanterar sin egen verktygsanropsloop.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
// Azure Functions entry point for the entity
[Function(nameof(ChatAgentEntity))]
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
{
return dispatcher.DispatchAsync<ChatAgentEntity>();
}
}
# Define the entity that wraps an existing agent SDK
@app.entity_trigger(context_name="context")
def chat_agent_entity(context):
# Load persisted conversation state
state = context.get_state(lambda: {"messages": []})
if context.operation_name == "message":
user_message = context.get_input()
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
context.set_state(state)
context.set_result(response)
const df = require("durable-functions");
// Define the entity that wraps an existing agent SDK
const chatAgentEntity = async function (context) {
// Load persisted conversation state
let state = context.df.getState(() => ({ messages: [] }));
switch (context.df.operationName) {
case "message":
const userMessage = context.df.getInput();
// Add the user message to the conversation history
state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(state.messages);
// Persist the response in the entity state
state.messages.push({ role: "assistant", content: response });
context.df.setState(state);
context.df.return(response);
break;
}
};
df.app.entity("ChatAgent", chatAgentEntity);
Anmärkning
Varaktiga entiteter i Java kräver version 1.9.0 eller senare av paketen durabletask-azure-functions och durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
// Register the entity with Azure Functions
@FunctionName("ChatAgent")
public String chatAgentEntity(
@DurableEntityTrigger(name = "req") String req) {
return EntityRunner.loadAndRun(req, ChatAgentEntity::new);
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
}
from durabletask.entities.durable_entity import DurableEntity
# Define the entity that wraps an existing agent SDK
class ChatAgentEntity(DurableEntity):
"""Durable entity wrapping an agent SDK."""
def message(self, user_message: str) -> str:
# Load persisted conversation state
state = self.get_state(default={"messages": []})
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
self.set_state(state)
return response
import { TaskEntity } from "@microsoft/durabletask-js";
// Define the entity that wraps an existing agent SDK
class ChatAgentEntity extends TaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
async message(userMessage: string): Promise<string> {
// Add the user message to the conversation history
this.state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(this.state.messages);
// Persist the response in the entity state
this.state.messages.push({ role: "assistant", content: response });
return response;
}
initializeState(): ChatAgentState {
return { messages: [] };
}
}
Anmärkning
Varaktiga entiteter i Java kräver version 1.9.0 eller senare av paketet durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
Tillägget Durable Task för Microsoft Agent Framework använder den här metoden. Den omsluter Microsoft Agent Framework-agenter som varaktiga entiteter som tillhandahåller beständiga sessioner, automatiska kontrollpunkter och inbyggda API-slutpunkter med en enda konfigurationsrad.
Nästa steg