Há duas abordagens gerais para criar aplicativos agente com IA:
-
Fluxos de trabalho determinísticos — seu código define o fluxo de controle. Você escreve a sequência de etapas, ramificação, paralelismo e tratamento de erros usando constructos de programação padrão. O LLM executa o trabalho dentro de cada etapa, mas não controla o fluxo geral.
-
Fluxos de trabalho direcionados por agentes (loops de agente) – o LLM conduz o fluxo de controle. O agente decide quais ferramentas chamar, em que ordem e quando a tarefa é concluída. Você fornece ferramentas e instruções, mas o agente determina o caminho de execução em runtime.
Ambas as abordagens se beneficiam da execução durável e podem ser implementadas usando o modelo de programação Da Tarefa Durável. Este artigo mostra como criar cada padrão usando exemplos de código.
Dica
Esses padrões se alinham com os designs de fluxo de trabalho orientado a agentes descritos no Building Effective Agents da Anthropic. O modelo de programação Tarefa Durável é mapeado naturalmente para esses padrões: as orquestrações definem o controle do fluxo de trabalho e são automaticamente registradas, enquanto as atividades encapsulam operações não determinísticas, como chamadas LLM, invocações de ferramentas e solicitações de API.
Escolher uma abordagem
A tabela a seguir ajuda você a decidir quando usar cada abordagem.
| Use fluxos de trabalho determinísticos quando... |
Use loops de agente quando... |
| A sequência de etapas é conhecida com antecedência. |
A tarefa é aberta e as etapas não podem ser previstas. |
| Você precisa de guardrails explícitos para o comportamento do agente. |
Você deseja que a LLM decida quais ferramentas usar e quando. |
| A conformidade ou a auditoria exigem um fluxo de controle revisível. |
O agente precisa adaptar sua abordagem com base em resultados intermediários. |
| Você deseja combinar várias estruturas de IA em um único fluxo de trabalho. |
Você está criando um agente conversacional com recursos de chamada de ferramentas. |
Ambas as abordagens fornecem ponto de verificação automático, políticas de repetição, dimensionamento distribuído e suporte humano no loop por meio da execução durável.
Padrões de fluxo de trabalho determinísticos
Em um fluxo de trabalho determinístico, seu código controla o caminho de execução. O LLM é chamado como uma etapa dentro do fluxo de trabalho, mas não decide o que acontece a seguir. O modelo de programação de Tarefa Durável é naturalmente mapeado para essa abordagem.
-
As orquestrações definem o fluxo de controle de fluxo de trabalho (sequência, ramificação, paralelismo, tratamento de erros) e são automaticamente colocadas em ponto de verificação.
-
As atividades encapsulam operações não determinísticas, como chamadas LLM, invocações de ferramentas e solicitações de API. As atividades podem ser executadas em qualquer instância de computação disponível.
Os exemplos a seguir usam Durable Functions, que é executado em Azure Functions com hospedagem sem servidor.
Os exemplos a seguir usam os portáteis SDKs de Tarefa Durable, que são executados em qualquer host de computação, incluindo Aplicativos de Contêiner do Azure, Kubernetes, máquinas virtuais ou até mesmo localmente.
Encadeamento de prompts
O encadeamento de prompt é o padrão agente mais simples. Você divide uma tarefa complexa em uma série de interações sequenciais de LLM, em que a saída de cada etapa se alimenta da entrada da próxima etapa. Como cada atividade chamada é automaticamente checada, uma falha no meio do pipeline não força você a reiniciar desde o início e consumir de novo tokens LLM caros — a execução é retomada da última etapa concluída.
Você também pode inserir portões de validação programática entre as etapas. Por exemplo, depois de gerar uma estrutura de tópicos, você pode verificar se ela atende a uma restrição de comprimento ou tópico antes de passá-la para a etapa de redação.
Esse padrão mapeia diretamente para o padrão de encadeamento de funções no modelo de programação de Tarefas Duráveis.
Quando usar: Pipelines de geração de conteúdo, processamento de documentos em várias etapas, enriquecimento sequencial de dados, fluxos de trabalho que exigem portões de validação intermediários.
[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var topic = context.GetInput<string>();
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
Observação
O estado da orquestração é automaticamente verificado em cada await instrução. Se o processo de host falhar ou a VM for reciclada, a orquestração será retomada automaticamente da última etapa concluída em vez de recomeçar.
@app.orchestration_trigger(context_name="context")
def prompt_chaining_orchestration(context: df.DurableOrchestrationContext):
topic = context.get_input()
# Step 1: Generate research outline
outline = yield context.call_activity("generate_outline_agent", topic)
# Step 2: Write first draft from outline
draft = yield context.call_activity("write_draft_agent", outline)
# Step 3: Refine and polish the draft
final_content = yield context.call_activity("refine_draft_agent", draft)
return final_content
Observação
O estado da orquestração é automaticamente verificado em cada yield instrução. Se o processo de host falhar ou a VM for reciclada, a orquestração será retomada automaticamente da última etapa concluída em vez de recomeçar.
const df = require("durable-functions");
df.app.orchestration("promptChainingOrchestration", function* (context) {
const topic = context.df.getInput();
// Step 1: Generate research outline
const outline = yield context.df.callActivity("generateOutlineAgent", topic);
// Step 2: Write first draft from outline
const draft = yield context.df.callActivity("writeDraftAgent", outline);
// Step 3: Refine and polish the draft
const finalContent = yield context.df.callActivity("refineDraftAgent", draft);
return finalContent;
});
Observação
O estado da orquestração é automaticamente verificado em cada yield instrução. Ao invés de recomeçar, a orquestração será retomada automaticamente da última etapa concluída se o processo de host falhar ou a VM for reiniciada.
@FunctionName("PromptChainingOrchestration")
public String promptChainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
return finalContent;
}
Observação
O estado da orquestração é automaticamente verificado em cada await() invocação. Se o processo de host falhar ou a VM for reciclada, a orquestração será retomada automaticamente da última etapa concluída em vez de recomeçar.
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, string topic)
{
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
}
Observação
O estado da orquestração é automaticamente verificado em cada await instrução. Se o processo hospedeiro falhar ou a VM for reciclada, a orquestração será retomada automaticamente da última etapa concluída, em vez de começar do zero.
def prompt_chaining_orchestration(ctx: task.OrchestrationContext, topic: str) -> str:
# Step 1: Generate research outline
outline = yield ctx.call_activity(generate_outline_agent, input=topic)
# Step 2: Write first draft from outline
draft = yield ctx.call_activity(write_draft_agent, input=outline)
# Step 3: Refine and polish the draft
final_content = yield ctx.call_activity(refine_draft_agent, input=draft)
return final_content
Observação
O estado da orquestração é automaticamente verificado em cada yield instrução. Se o processo de host falhar ou a máquina virtual for reciclada, a orquestração será retomada automaticamente da última etapa concluída em vez de recomeçar.
const promptChainingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, topic: string): any {
// Step 1: Generate research outline
const outline: string = yield ctx.callActivity(generateOutlineAgent, topic);
// Step 2: Write first draft from outline
const draft: string = yield ctx.callActivity(writeDraftAgent, outline);
// Step 3: Refine and polish the draft
const finalContent: string = yield ctx.callActivity(refineDraftAgent, draft);
return finalContent;
};
Observação
O estado da orquestração é automaticamente verificado em cada yield instrução. Se o processo anfitrião falhar ou a VM for reciclada, a orquestração será retomada automaticamente da última etapa concluída em vez de reiniciar.
ctx -> {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
ctx.complete(finalContent);
}
Observação
O estado da orquestração é automaticamente verificado em cada await() invocação. Se o processo de host travar ou a Máquina Virtual (VM) for reciclada, a orquestração será retomada automaticamente da última etapa concluída, em vez de começar do zero novamente.
Routing
O roteamento usa uma etapa de classificação para determinar qual agente ou modelo downstream deve lidar com uma solicitação. A orquestração chama uma atividade de classificador primeiro e, em seguida, ramifica para o manipulador apropriado com base no resultado. Essa abordagem permite que você adapte o prompt, o modelo e o conjunto de ferramentas de cada manipulador de forma independente, por exemplo, direcionando perguntas de cobrança para um agente especializado com acesso a APIs de pagamento ao enviar perguntas gerais para um modelo mais leve.
Quando usar: Triagem de suporte ao cliente, classificação de intenção para agentes especializados, seleção de modelo dinâmico com base na complexidade da tarefa.
[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<SupportRequest>();
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
"general" => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
@app.orchestration_trigger(context_name="context")
def routing_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Classify the request type
category = yield context.call_activity("classify_request_agent", request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield context.call_activity("billing_agent", request))
elif category == "technical":
return (yield context.call_activity("technical_support_agent", request))
else:
return (yield context.call_activity("general_inquiry_agent", request))
const df = require("durable-functions");
df.app.orchestration("routingOrchestration", function* (context) {
const request = context.df.getInput();
// Classify the request type
const category = yield context.df.callActivity("classifyRequestAgent", request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield context.df.callActivity("billingAgent", request);
case "technical":
return yield context.df.callActivity("technicalSupportAgent", request);
default:
return yield context.df.callActivity("generalInquiryAgent", request);
}
});
@FunctionName("RoutingOrchestration")
public String routingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
return switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, SupportRequest request)
{
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
}
def routing_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Classify the request type
category = yield ctx.call_activity(classify_request_agent, input=request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield ctx.call_activity(billing_agent, input=request))
elif category == "technical":
return (yield ctx.call_activity(technical_support_agent, input=request))
else:
return (yield ctx.call_activity(general_inquiry_agent, input=request))
const routingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: SupportRequest): any {
// Classify the request type
const category: string = yield ctx.callActivity(classifyRequestAgent, request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield ctx.callActivity(billingAgent, request);
case "technical":
return yield ctx.callActivity(technicalSupportAgent, request);
default:
return yield ctx.callActivity(generalInquiryAgent, request);
}
};
ctx -> {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
String result = switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
ctx.complete(result);
}
Paralelização
Quando você tiver várias subtarefas independentes, poderá expedi-las como chamadas de atividade paralela e aguardar todos os resultados antes de prosseguir. O Agendador de Tarefas Duráveis distribui essas atividades em todas as instâncias de computação disponíveis automaticamente, o que significa que adicionar mais trabalhadores reduz diretamente o tempo total do relógio de parede.
Uma variante comum é a votação de vários modelos: você envia o mesmo prompt para vários modelos (ou o mesmo modelo com temperaturas diferentes) em paralelo e, em seguida, agrega ou seleciona entre as respostas. Como cada ramo paralelo é verificado de forma independente, uma falha transitória em um ramo não afeta os outros.
Esse padrão se alinha diretamente com o padrão fan-out/fan-in no Durable Task.
Quando usar: Análise em lote de documentos, chamadas de ferramenta paralela, avaliação de vários modelos, moderação de conteúdo com vários revisores.
[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
@app.orchestration_trigger(context_name="context")
def parallel_research_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
context.call_activity("research_subtopic_agent", subtopic)
)
research_results = yield context.task_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": research_results
})
return summary
const df = require("durable-functions");
df.app.orchestration("parallelResearchOrchestration", function* (context) {
const request = context.df.getInput();
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
context.df.callActivity("researchSubtopicAgent", subtopic)
);
const researchResults = yield context.df.Task.all(tasks);
// Aggregate: synthesize all research into a single summary
const summary = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: researchResults,
});
return summary;
});
@FunctionName("ParallelResearchOrchestration")
public String parallelResearchOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
}
def parallel_research_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
ctx.call_activity(research_subtopic_agent, input=subtopic)
)
research_results = yield task.when_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": research_results
})
return summary
const parallelResearchOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext,
request: { topic: string; subtopics: string[] }): any {
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
ctx.callActivity(researchSubtopicAgent, subtopic)
);
const researchResults: string[] = yield whenAll(tasks);
// Aggregate: synthesize all research into a single summary
const summary: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: researchResults,
});
return summary;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
ctx.complete(summary);
}
Orquestradores-trabalhadores
Nesse padrão, um orquestrador central inicialmente chama uma LLM (por meio de uma atividade) para planejar o trabalho. Com base na saída do LLM, o orquestrador determina quais subtarefas são necessárias. O orquestrador então envia essas subtarefas para orquestrações de trabalho especializadas. A principal diferença da paralelização é que o conjunto de subtarefas não é corrigido em tempo de design; o orquestrador determina-os dinamicamente em runtime.
Esse padrão usa sub-orquestrações, que são subfluxos de trabalho com pontos de verificação realizados de forma independente. Cada orquestração de trabalho pode conter várias etapas, novas tentativas e paralelismo aninhado.
Quando usar: Pipelines de pesquisa aprofundada, fluxos de trabalho de agente de codificação que alteram vários arquivos, colaboração entre múltiplos agentes em que cada agente tem uma função distinta.
[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
@app.orchestration_trigger(context_name="context")
def orchestrator_workers_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Central orchestrator: determine what research is needed
subtasks = yield context.call_activity("plan_research_agent", request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
context.call_sub_orchestrator("research_worker_orchestration", subtask)
)
results = yield context.task_all(worker_tasks)
# Synthesize results
final_report = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": results
})
return final_report
const df = require("durable-functions");
df.app.orchestration("orchestratorWorkersOrchestration", function* (context) {
const request = context.df.getInput();
// Central orchestrator: determine what research is needed
const subtasks = yield context.df.callActivity("planResearchAgent", request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
context.df.callSubOrchestrator("researchWorkerOrchestration", subtask)
);
const results = yield context.df.Task.all(workerTasks);
// Synthesize results
const finalReport = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: results,
});
return finalReport;
});
@FunctionName("OrchestratorWorkersOrchestration")
public String orchestratorWorkersOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
}
def orchestrator_workers_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Central orchestrator: determine what research is needed
subtasks = yield ctx.call_activity(plan_research_agent, input=request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
ctx.call_sub_orchestrator(research_worker_orchestration, input=subtask)
)
results = yield task.when_all(worker_tasks)
# Synthesize results
final_report = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": results
})
return final_report
const orchestratorWorkersOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ResearchRequest): any {
// Central orchestrator: determine what research is needed
const subtasks: string[] = yield ctx.callActivity(planResearchAgent, request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
ctx.callSubOrchestrator(researchWorkerOrchestration, subtask)
);
const results: string[] = yield whenAll(workerTasks);
// Synthesize results
const finalReport: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: results,
});
return finalReport;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
ctx.complete(finalReport);
}
Avaliador-otimizador
O padrão avaliador-otimizador emparelha um agente gerador com um agente avaliador em um loop de refinamento. O gerador produz a saída, o avaliador pontua-a em relação aos critérios de qualidade e fornece comentários e o loop se repete até que a saída passe ou uma contagem máxima de iteração seja atingida. Como cada iteração do loop é registrada em um ponto de verificação de progresso, uma falha após três rodadas bem-sucedidas de refinamento não resultará em perda desse progresso.
Esse padrão é especialmente útil quando a qualidade pode ser medida programaticamente — por exemplo, validar que o código gerado compila ou que uma tradução preserva entidades nomeadas.
Quando usar: Geração de código com revisão automatizada, tradução literária, refinamento de conteúdo iterativo, tarefas de pesquisa complexas que exigem várias rodadas de análise.
[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ContentRequest>();
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
@app.orchestration_trigger(context_name="context")
def evaluator_optimizer_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield context.call_activity("generate_content_agent", {
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield context.call_activity("evaluate_content_agent", content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const df = require("durable-functions");
df.app.orchestration("evaluatorOptimizerOrchestration", function* (context) {
const request = context.df.getInput();
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield context.df.callActivity("generateContentAgent", {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield context.df.callActivity("evaluateContentAgent", content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
});
@FunctionName("EvaluatorOptimizerOrchestration")
public String evaluatorOptimizerOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
return content;
}
feedback = evaluation.getFeedback();
}
return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ContentRequest request)
{
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
}
def evaluator_optimizer_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield ctx.call_activity(generate_content_agent, input={
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield ctx.call_activity(evaluate_content_agent, input=content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const evaluatorOptimizerOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ContentRequest): any {
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield ctx.callActivity(generateContentAgent, {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield ctx.callActivity(evaluateContentAgent, content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
};
ctx -> {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
ctx.complete(content);
return;
}
feedback = evaluation.getFeedback();
}
ctx.complete(content); // Return best effort after max iterations
}
Loops do agente
Em uma implementação típica de agente de IA, uma LLM é invocada em um loop, chamando ferramentas e tomando decisões até que a tarefa seja concluída ou uma condição de parada seja atingida. Ao contrário dos fluxos de trabalho determinísticos, o caminho de execução não é predefinido. O agente determina o que fazer em cada etapa com base nos resultados das etapas anteriores.
Os loops do agente são adequados para tarefas em que o número ou a ordem das etapas não podem ser previstas. Exemplos comuns incluem agentes de codificação abertos, pesquisas autônomas e bots de conversa com recursos de chamada de ferramentas.
Há duas abordagens recomendadas para implementar loops de agente com o modelo de programação da Tarefa Durável:
| Abordagem |
Descrição |
Quando usar |
|
Baseado em orquestração |
Escreva o loop do agente como uma orquestração durável. As chamadas de ferramenta são implementadas como atividades e a entrada humana usa eventos externos. A orquestração controla a estrutura do loop enquanto o LLM controla as decisões dentro dele. |
Você precisa de controle refinado sobre o loop, políticas de repetição por ferramenta, balanceamento de carga distribuído de chamadas de ferramenta ou a capacidade de depurar o loop em seu IDE com pontos de interrupção. |
|
Baseado em entidade |
Cada instância de agente é uma entidade durável. A estrutura do agente gerencia o loop internamente, e a entidade fornece persistência de estado duradouro e de sessão. |
Você está usando uma estrutura de agente (como Microsoft Agent Framework) que já implementa o loop do agente e deseja adicionar durabilidade com alterações mínimas de código. |
Loops de agente baseados em orquestração
Um loop de agente baseado em orquestração combina várias funcionalidades de Tarefa Durável: orquestrações eternas (continue-as-new) para manter a memória controlada, fan-out/fan-in para execução simultânea de ferramenta e eventos externos para interações com humanos no circuito. Cada iteração do loop:
- Envia o contexto de conversa atual para o LLM por meio de uma atividade ou entidade com estado.
- Recebe a resposta da LLM, que pode incluir chamadas de ferramenta.
- Executa quaisquer chamadas de ferramenta como atividades (distribuídas entre os recursos computacionais disponíveis).
- Opcionalmente, aguarda a entrada humana usando eventos externos.
- Continua o loop com o estado atualizado, ou se conclui quando o agente sinaliza que terminou.
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get state from input (supports continue-as-new)
var state = context.GetInput<AgentState>() ?? new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them in parallel
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!; // Orchestration will restart with updated state
}
}
return "Max iterations reached.";
}
@app.orchestration_trigger(context_name="context")
def agent_loop_orchestration(context: df.DurableOrchestrationContext):
# Get state from input (supports continue-as-new)
state = context.get_input() or {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield context.call_activity("call_llm_agent", state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
context.call_activity("execute_tool", tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield context.task_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield context.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
context.continue_as_new(state)
return
return "Max iterations reached."
const df = require("durable-functions");
df.app.orchestration("agentLoopOrchestration", function* (context) {
// Get state from input (supports continue-as-new)
const state = context.df.getInput() || { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield context.df.callActivity("callLlmAgent", state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc) =>
context.df.callActivity("executeTool", tc)
);
const toolResults = yield context.df.Task.all(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput = yield context.df.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
context.df.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
});
@FunctionName("AgentLoopOrchestration")
public String agentLoopOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get state from input (supports continue-as-new)
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
return llmResponse.getFinalAnswer();
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return null;
}
}
return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, AgentState? state)
{
state ??= new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!;
}
}
return "Max iterations reached.";
}
}
def agent_loop_orchestration(ctx: task.OrchestrationContext, state: dict | None) -> str:
if state is None:
state = {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield ctx.call_activity(call_llm_agent, input=state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
ctx.call_activity(execute_tool, input=tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield task.when_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield ctx.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
ctx.continue_as_new(state)
return
return "Max iterations reached."
const agentLoopOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, state: AgentState | null): any {
if (!state) state = { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield ctx.callActivity(callLlmAgent, state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc: any) =>
ctx.callActivity(executeTool, tc)
);
const toolResults = yield whenAll(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput: string = yield ctx.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
ctx.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
};
ctx -> {
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
ctx.complete(llmResponse.getFinalAnswer());
return;
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return;
}
}
ctx.complete("Max iterations reached.");
}
Loops de agente baseados em entidade
Se você estiver usando uma estrutura de agente que já implementa seu próprio loop de agente, poderá encapsule-o em uma entidade durável para adicionar durabilidade sem reescrever a lógica de loop. Cada instância de entidade representa uma única sessão de agente. A entidade recebe mensagens, delega internamente ao framework do agente e mantém o estado da conversa ao longo das interações.
A principal vantagem dessa abordagem é a simplicidade: você escreve seu agente usando sua ferramenta preferida e integra a durabilidade como uma questão de hospedagem em vez de redesenhar o fluxo de controle do agente. A entidade atua como um encapsulador robusto, tratando a persistência e a recuperação da sessão automaticamente.
Os exemplos a seguir mostram como encapsular um SDK de agente existente como uma entidade durável. A entidade expõe uma message operação que clientes invocam para enviar entrada de usuário. Internamente, a entidade delega à estrutura do agente, que gerencia seu próprio loop de chamada de ferramentas.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
// Azure Functions entry point for the entity
[Function(nameof(ChatAgentEntity))]
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
{
return dispatcher.DispatchAsync<ChatAgentEntity>();
}
}
# Define the entity that wraps an existing agent SDK
@app.entity_trigger(context_name="context")
def chat_agent_entity(context):
# Load persisted conversation state
state = context.get_state(lambda: {"messages": []})
if context.operation_name == "message":
user_message = context.get_input()
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
context.set_state(state)
context.set_result(response)
const df = require("durable-functions");
// Define the entity that wraps an existing agent SDK
const chatAgentEntity = async function (context) {
// Load persisted conversation state
let state = context.df.getState(() => ({ messages: [] }));
switch (context.df.operationName) {
case "message":
const userMessage = context.df.getInput();
// Add the user message to the conversation history
state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(state.messages);
// Persist the response in the entity state
state.messages.push({ role: "assistant", content: response });
context.df.setState(state);
context.df.return(response);
break;
}
};
df.app.entity("ChatAgent", chatAgentEntity);
Observação
As entidades duráveis no Java exigem a versão 1.9.0 ou posterior dos pacotes durabletask-azure-functions e durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
// Register the entity with Azure Functions
@FunctionName("ChatAgent")
public String chatAgentEntity(
@DurableEntityTrigger(name = "req") String req) {
return EntityRunner.loadAndRun(req, ChatAgentEntity::new);
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
}
from durabletask.entities.durable_entity import DurableEntity
# Define the entity that wraps an existing agent SDK
class ChatAgentEntity(DurableEntity):
"""Durable entity wrapping an agent SDK."""
def message(self, user_message: str) -> str:
# Load persisted conversation state
state = self.get_state(default={"messages": []})
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
self.set_state(state)
return response
import { TaskEntity } from "@microsoft/durabletask-js";
// Define the entity that wraps an existing agent SDK
class ChatAgentEntity extends TaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
async message(userMessage: string): Promise<string> {
// Add the user message to the conversation history
this.state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(this.state.messages);
// Persist the response in the entity state
this.state.messages.push({ role: "assistant", content: response });
return response;
}
initializeState(): ChatAgentState {
return { messages: [] };
}
}
Observação
Entidades duráveis em Java exigem a versão 1.9.0 ou posterior do pacote durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
A extensão Durable Task para Microsoft Agent Framework usa essa abordagem. Ele encapsula agentes do Microsoft Agent Framework como entidades duráveis, fornecendo sessões persistentes, checkpointing automático e endpoints de API incorporados com uma única linha de configuração.
Próximas Etapas