Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página fornece uma visão geral de Checkpoints no sistema de fluxo de trabalho Microsoft Agent Framework.
Visão geral
Os pontos de verificação permitem que você salve o estado de um fluxo de trabalho em pontos específicos durante sua execução e retome a partir desses pontos mais tarde. Esse recurso é particularmente útil para os seguintes cenários:
- Fluxos de trabalho de longa execução onde você deseja evitar a perda de progresso em caso de falhas.
- Fluxos de trabalho de longa execução em que você deseja pausar e retomar a execução posteriormente.
- Fluxos de trabalho que exigem armazenamento de estado periódico para fins de auditoria ou cumprimento.
- Fluxos de trabalho que precisam ser migrados entre diferentes ambientes ou instâncias.
Quando são criados os pontos de verificação?
Lembre-se de que os fluxos de trabalho são executados em superetapas, conforme documentado nos conceitos principais. Os pontos de verificação são criados no final de cada superetapa, depois de todos os executores nessa superetapa terem concluído a sua execução. Um ponto de verificação captura todo o estado do fluxo de trabalho, incluindo:
- O estado atual de todos os executores
- Todas as mensagens pendentes no fluxo de trabalho para a próxima superetapa
- Pedidos e respostas pendentes
- Estados partilhados
Capturando pontos de verificação
Para ativar o checkpointing, é necessário especificar um CheckpointManager ao executar o fluxo de trabalho. Um ponto de controlo pode então ser acedido através de um SuperStepCompletedEvent, ou através da propriedade Checkpoints da execução.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Para permitir o checkpointing, é necessário fornecer um CheckpointStorage ao criar um fluxo de trabalho. Um ponto de controlo pode então ser acedido através do armazenamento. O Agent Framework inclui três implementações integradas — escolha aquela que se adequa às suas necessidades de durabilidade e implementação:
| Provider | Package | Durability | Melhor para |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
Apenas em processo | Testes, demonstrações, fluxos de trabalho de curta duração |
FileCheckpointStorage |
agent-framework |
Disco local | Fluxos de trabalho de máquina única, desenvolvimento local |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Fluxos de trabalho de produção distribuída e entre processos |
Os três implementam o mesmo CheckpointStorage protocolo, por isso pode trocar de fornecedor sem alterar o fluxo de trabalho ou o código do executor.
InMemoryCheckpointStorage mantém os checkpoints na memória do processo. Ideal para testes, demonstrações e fluxos de trabalho de curta duração onde não precisas de durabilidade em reinícios.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Retomar a partir de pontos de verificação
Você pode retomar um fluxo de trabalho de um ponto de verificação específico diretamente na mesma execução.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Você pode retomar um fluxo de trabalho de um ponto de verificação específico diretamente na mesma instância de fluxo de trabalho.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Reidratação a partir de pontos de verificação
Ou você pode reidratar um fluxo de trabalho de um ponto de verificação para uma nova instância de execução.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Ou pode-se reidratar uma nova instância de fluxo de trabalho a partir de um checkpoint.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Salvar Estados do Executor
Para garantir que o estado de um executor seja capturado em um ponto de verificação, o executor deve substituir o OnCheckpointingAsync método e salvar seu estado no contexto do fluxo de trabalho.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Além disso, para garantir que o estado seja restaurado corretamente ao retomar a partir de um ponto de verificação, o executor deve substituir o OnCheckpointRestoredAsync método e carregar seu estado do contexto do fluxo de trabalho.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Para garantir que o estado de um executor é capturado num checkpoint, o executor deve redefinir o método on_checkpoint_save e devolver o seu estado como um dicionário.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Além disso, para garantir que o estado é corretamente restaurado ao retomar de um ponto de verificação, o executor deve substituir o método on_checkpoint_restore e restaurar o estado a partir do dicionário de estado fornecido.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Considerações de segurança
Importante
O armazenamento de pontos de controlo é uma fronteira de confiança. Quer use implementações de armazenamento incorporadas ou personalizadas, o backend de armazenamento deve ser tratado como infraestrutura privada e de confiança. Nunca carregue pontos de verificação de fontes não seguras ou potencialmente adulteradas.
Certifique-se de que o local de armazenamento utilizado para os pontos de controlo está devidamente protegido. Apenas serviços autorizados e utilizadores devem ter acesso de leitura ou escrita aos dados dos pontos de controlo.
Serialização com Pickle
Tanto FileCheckpointStorage como CosmosCheckpointStorage utilizam o módulo pickle da Python para serializar estados não nativos de JSON, como dataclasses, data-times e objetos personalizados. Para mitigar os riscos de execução arbitrária de código durante a desserialização, ambos os provedores utilizam por padrão um unpickler restrito. Apenas um conjunto incorporado de tipos seguros de Python (primitivas, datetime, uuid, Decimal, coleções comuns, etc.) e todos os tipos internos agent_framework são permitidos durante a deserialização. Qualquer outro tipo encontrado num ponto de controlo faz com que a desserialização falhe com um WorkflowCheckpointException.
Para permitir tipos adicionais específicos de aplicação, passe-os através do allowed_checkpoint_types parâmetro usando "module:qualname" o formato:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage aceita o mesmo parâmetro:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Se o seu modelo de ameaça não permitir de todo a serialização baseada em pickle, use InMemoryCheckpointStorage ou implemente um código personalizado CheckpointStorage com uma estratégia alternativa de serialização.
Responsabilidade do local de armazenamento
FileCheckpointStorage requer um parâmetro explícito storage_path — não existe um diretório predefinido. Embora a estrutura valide contra ataques por percurso de caminhos, a segurança do diretório de armazenamento em si (permissões de ficheiros, encriptação em repouso, controlos de acesso) é responsabilidade do programador. Apenas os processos autorizados devem ter acesso de leitura ou escrita ao diretório do checkpoint.
CosmosCheckpointStorage depende da Azure Cosmos DB para armazenamento. Utilize identidade gerida/RBAC sempre que possível, delimite a base de dados e o contentor ao serviço de workflow, e alterne/revolva as chaves da conta se utilizar autenticação baseada em chaves. Assim como no armazenamento de ficheiros, apenas os principais autorizados devem ter acesso de leitura ou escrita ao contentor do Cosmos DB que contém os documentos de pontos de verificação.