Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
A extensão Durable Task para Microsoft Agent Framework traz durable execution diretamente no Microsoft Agent Framework. Você pode registrar agentes com a extensão para torná-los automaticamente duráveis com sessões persistentes, pontos de extremidade de API internos e dimensionamento distribuído , sem alterações na lógica do agente.
A extensão implementa internamente loops de agente baseados em entidade, em que cada sessão de agente é uma entidade durável que gerencia automaticamente o estado da conversa e o ponto de verificação.
A extensão dá suporte a duas abordagens de hospedagem:
- Azure Functions usando o pacote de integração Azure Functions.
- Traga seus próprios recursos computacionais usando o pacote base.
Hospedagem de agente
Defina seu agente usando o padrão padrão do Microsoft Agent Framework e, em seguida, aprimore-o com a extensão Tarefa Durável. A extensão lida com a persistência da sessão, a criação de endpoints e o gerenciamento de estado automaticamente.
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// One line to make the agent durable with serverless hosting
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableAgents(options => options.AddAIAgent(agent))
.Build();
app.Run();
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// Host the agent with Durable Task Scheduler
string connectionString = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableAgents(
options => options.AddAIAgent(agent),
workerBuilder: builder => builder.UseDurableTaskScheduler(connectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(connectionString));
})
.Build();
await host.StartAsync();
Orquestração de vários agentes
Você pode coordenar vários agentes especializados como etapas em uma orquestração durável. Cada chamada de agente tem um ponto de verificação, e a orquestração se recupera automaticamente se qualquer passo falhar. As chamadas de agente concluídas não são reexecutadas durante a recuperação.
O exemplo a seguir mostra um fluxo de trabalho sequencial de vários agentes em que um agente de pesquisa coleta informações e um agente gravador produz um documento.
[Function(nameof(DocumentPublishingOrchestration))]
public async Task<string> DocumentPublishingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var docRequest = context.GetInput<DocumentRequest>();
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
static async Task<string> DocumentPublishingOrchestration(
TaskOrchestrationContext context, DocumentRequest docRequest)
{
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
Fluxos de trabalho baseados em grafo
A extensão Durable Task também dá suporte a fluxos de trabalho Microsoft Agent Framework, que usam um modelo de programação declarativo baseado em grafo (WorkflowBuilder) para definir pipelines de múltiplas etapas de executores e agentes. A extensão verifica automaticamente cada etapa no grafo e se recupera de falhas sem alterações na definição do fluxo de trabalho.
Fluxo de trabalho sequencial
O exemplo a seguir encadeia três executores em um fluxo de trabalho de cancelamento de pedidos: pesquisar o pedido, cancelá-lo e, em seguida, enviar um email de confirmação.
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(cancelOrder))
.Build();
app.Run();
Os executores OrderLookup, OrderCancel e SendEmail são executores padrão Microsoft Agent Framework sem código específico durável. Para implementações completas, consulte as samplas no GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(cancelOrder),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(cancelOrder, "ORD-12345");
string? result = await run.WaitForCompletionAsync<string>();
Os executores OrderLookup, OrderCancel e SendEmail são executores padrão Microsoft Agent Framework sem código específico durável. Para implementações completas, consulte as samplas no GitHub.
Fluxo de trabalho de fan-out/fan-in (simultâneo)
Você pode usar vários executores ou agentes que são executados em paralelo e, em seguida, entrar para agregar os resultados. O exemplo a seguir envia uma pergunta científica a um físico e agente químico em paralelo e, em seguida, agrega suas respostas.
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
ParseQuestionExecutor parseQuestion = new();
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
Os ParseQuestionExecutor e AggregatorExecutor são executores padrão Microsoft Agent Framework sem código específico de Durable. Para implementações completas, consulte as samplas no GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
ParseQuestionExecutor parseQuestion = new();
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableOptions(
options => options.Workflows.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IWorkflowRun run = await workflowClient.RunAsync(workflow, "Why is the sky blue?");
if (run is IAwaitableWorkflowRun awaitableRun)
{
string? result = await awaitableRun.WaitForCompletionAsync<string>();
Console.WriteLine(result);
}
Os ParseQuestionExecutor e AggregatorExecutor são executores padrão do Microsoft Agent Framework sem código específico do Durable. Para implementações completas, consulte as samplas no GitHub.
Fluxo de trabalho de roteamento condicional
Você pode rotear a execução para diferentes branches com base nos resultados do runtime. O exemplo a seguir usa um agente de detecção de spam para classificar emails de entrada e, em seguida, roteia para um manipulador de spam ou um agente assistente de email.
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
Os SpamHandlerExecutor e EmailSenderExecutor são executores padrão do Microsoft Agent Framework sem código específico do Durable. Para implementações completas, consulte as samplas no GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(workflow, "Check this email for spam");
string? result = await run.WaitForCompletionAsync<string>();
Os SpamHandlerExecutor e EmailSenderExecutor são executores padrão do Microsoft Agent Framework sem código específico do Durable. Para implementações completas, consulte as samplas no GitHub.
Fluxo de trabalho HITL (human-in-the-loop)
Você pode pausar a execução do fluxo de trabalho em pontos designados para aguardar a entrada externa antes de continuar. O modelo de fluxo de trabalho do Microsoft Agent Framework usa nós RequestPort (em .NET) ou ctx.request_info() (em Python) para definir pontos de pausa. O exemplo a seguir implementa um fluxo de trabalho de reembolso de despesas com uma aprovação do gerente seguida de aprovações paralelas de orçamento e conformidade.
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.WithDescription("Expense reimbursement with manager and parallel finance approvals")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows =>
workflows.AddWorkflow(expenseApproval, exposeStatusEndpoint: true))
.Build();
app.Run();
A estrutura gera automaticamente três pontos de extremidade HTTP para interação HITL.
-
POST /api/workflows/{name}/run: iniciar o fluxo de trabalho -
GET /api/workflows/{name}/status/{id}: Verifique o Status e as Aprovações Pendentes -
POST /api/workflows/{name}/respond/{id}: enviar resposta de aprovação para retomar
Os seguintes tipos de registro definem os dados que fluem por meio do fluxo de trabalho:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Os executores CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse são executores padrão Microsoft Agent Framework sem código específico durável. Para implementações completas, consulte as samplas no GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
options => options.AddWorkflow(expenseApproval),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IStreamingWorkflowRun run = await workflowClient.StreamAsync(expenseApproval, "EXP-2025-001");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case DurableWorkflowWaitingForInputEvent requestEvent:
Console.WriteLine($"Workflow paused at: {requestEvent.RequestPort.Id}");
ApprovalResponse approval = new(Approved: true, Comments: "Approved.");
await run.SendResponseAsync(requestEvent, approval);
break;
case DurableWorkflowCompletedEvent completedEvent:
Console.WriteLine($"Workflow completed: {completedEvent.Result}");
break;
}
}
Os seguintes tipos de registro definem os dados que fluem por meio do fluxo de trabalho:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Os executores CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse são executores padrão Microsoft Agent Framework sem código específico durável. Para implementações completas, consulte as samplas no GitHub.
Painel do Agendador de Tarefas Duráveis
Use o painel do Agendador de Tarefas Duráveis para ter visibilidade total sobre seus agentes duráveis, orquestrações e workflows baseados em grafos:
- Exibir o histórico de conversas para cada sessão do agente
- Inspecionar chamadas de ferramenta e saídas estruturadas
- Rastrear a orquestração e a execução de fluxos de trabalho
- Monitorar métricas de desempenho
Tanto o desenvolvimento local (por meio do emulador) quanto as implantações de produção apresentam a mesma experiência de painel.
A captura de tela a seguir mostra uma sessão do agente com seu histórico de conversas e detalhes da sessão:
A captura de tela a seguir mostra uma orquestração determinística com detalhes de execução da atividade:
TTL (tempo de vida útil da sessão)
As sessões de agente duráveis mantêm automaticamente o histórico e o estado da conversa, que podem se acumular indefinidamente. O recurso TTL (vida útil) fornece limpeza automática de sessões ociosas, impedindo o consumo de recursos de armazenamento e o aumento dos custos.
Quando uma sessão de agente fica ociosa por mais tempo do que o período TTL configurado, o estado da sessão é excluído automaticamente. Cada nova interação redefine o temporizador TTL, estendendo o tempo de vida da sessão.
Valores padrão
- TTL padrão: 14 dias
- Atraso mínimo de exclusão de TTL: 5 minutos
Configuração
O TTL pode ser configurado globalmente ou por agente. Quando uma sessão de agente expira, todo o seu estado é excluído, incluindo o histórico da conversa e quaisquer dados de estado personalizados. Se uma mensagem for enviada para a mesma sessão após a exclusão, uma nova sessão será criada com um novo histórico de conversas.
Observação
A configuração de TTL está disponível apenas em .NET.
services.ConfigureDurableAgents(
options =>
{
// Set global default TTL to 7 days
options.DefaultTimeToLive = TimeSpan.FromDays(7);
// Agent with custom TTL of 1 day
options.AddAIAgent(shortLivedAgent, timeToLive: TimeSpan.FromDays(1));
// Agent with custom TTL of 90 days
options.AddAIAgent(longLivedAgent, timeToLive: TimeSpan.FromDays(90));
// Agent using global default (7 days)
options.AddAIAgent(defaultAgent);
// Agent with no TTL (never expires)
options.AddAIAgent(permanentAgent, timeToLive: null);
});
Limitações conhecidas
Tamanho máximo da conversa.
O estado da sessão do agente, incluindo o histórico completo da conversa, está sujeito aos limites de tamanho de estado do back-end durável. Ao usar o Agendador de Tarefas Duráveis, o tamanho máximo do estado da entidade é de 1 MB. Conversas de longa execução com grandes respostas de chamadas de ferramentas podem atingir esse limite. A compactação do histórico de conversas deve ser feita manualmente, por exemplo, iniciando uma nova sessão de agente e resumindo o contexto anterior.Latência.
Todas as interações do agente são roteadas por meio do Agendador de Tarefas Duráveis, que adiciona latência em comparação com a execução do agente na memória. Essa compensação fornece durabilidade e dimensionamento distribuído.Streaming.
Como os agentes duráveis são implementados sobre entidades duráveis, o modelo de comunicação subjacente é solicitação/resposta. Há suporte para streaming por meio de retornos de chamada de resposta (por exemplo, empurrar tokens para um Stream Redis para consumo pelo cliente), enquanto a entidade retorna a resposta completa quando o fluxo termina.Expiração de TTL.
O temporizador TTL baseia-se no tempo do relógio de parede desde a última mensagem, não no tempo de atividade cumulativo. Depois que uma sessão é excluída (via expiração TTL ou exclusão manual), seu histórico de conversa não pode ser recuperado.
Links relacionados
Para obter exemplos de código completos:
Para obter exemplos de código completos: