API del flusso di lavoro funzionale

Avvertimento

L'API del flusso di lavoro funzionale è sperimentale e soggetta a modifiche o rimozione nelle versioni future senza preavviso.

L'API del flusso di lavoro funzionale consente di scrivere flussi di lavoro come funzioni asincrone Python semplici. Invece di definire classi executor, configurare i collegamenti e usare WorkflowBuilder, si applica un decoratore a una funzione async con @workflow e si utilizza il flusso di controllo nativo di Python — if/else, cicli for, asyncio.gather — per esprimere la logica.

Per un confronto affiancato con l'API graph, vedere API del flusso di lavoro nella panoramica dei flussi di lavoro.

@workflow decoratore

Applicare @workflow a una async funzione per convertirla in un FunctionalWorkflow oggetto :

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

L'elemento @workflow decorator supporta una forma parametrizzabile con argomenti facoltativi:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parametri

Parametro Type Description
name str | None Nome visualizzato per il flusso di lavoro. Imposta il valore predefinito della funzione __name__.
description str | None Descrizione leggibile facoltativa.
checkpoint_storage CheckpointStorage | None Archiviazione predefinita per rendere persistenti i risultati dei passaggi tra le esecuzioni. Può essere sottoposto a override per ogni chiamata in run().

Firma della funzione flusso di lavoro

Il primo parametro della funzione del flusso di lavoro riceve l'input passato a .run(). Aggiungere un ctx: WorkflowRunContext parametro solo quando è necessario HITL, lo stato key/value o eventi personalizzati — altrimenti è facoltativo:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext viene rilevato prima dall'annotazione di tipo, quindi dal nome del parametro ctx, quindi funzionano sia ctx: WorkflowRunContext che un parametro semplice ctx.

Esecuzione di un flusso di lavoro

Chiamare .run() sull'oggetto FunctionalWorkflow restituito da @workflow:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parametri

Parametro Type Description
message Any | None Input passato alla funzione del flusso di lavoro come primo argomento.
stream bool Se True, fornisce un ResponseStream che restituisce oggetti WorkflowEvent. Il valore predefinito è False.
responses dict[str, Any] | None Risposte HITL indicizzate da request_id. Utilizzato per riprendere un flusso di lavoro sospeso.
checkpoint_id str | None Checkpoint da cui eseguire il ripristino. checkpoint_storage deve essere impostato.
checkpoint_storage CheckpointStorage | None Esegue l'override del set di archiviazione predefinito nell'elemento Decorator per questa esecuzione.
include_status_events bool Includere eventi di modifica dello stato nel risultato non di streaming.

È necessario specificare esattamente uno di message, responseso checkpoint_id per ogni chiamata.

WorkflowRunResult

run() (non in streaming) restituisce un oggetto WorkflowRunResult. Metodi chiave:

Metodo/proprietà Returns Description
.text str Primo output come stringa. Stringa vuota se non viene restituito alcun output di stringa.
.get_outputs() list[Any] Tutti gli output generati dal flusso di lavoro.
.get_final_state() WorkflowRunState Stato di esecuzione finale (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Richieste HITL in sospeso quando lo stato è IDLE_WITH_PENDING_REQUESTS.

Trasmissione in diretta

Passare stream=True per ricevere gli eventi man mano che vengono generati:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Vedere python/samples/03-workflows/functional/basic_streaming_pipeline.py per un esempio completo.

@step decoratore

@step è un decoratore opzionale che aggiunge la memorizzazione nella cache dei risultati, l'emissione di eventi e la creazione di punti di controllo alle singole funzioni asincrone.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Operazioni @step all'interno di un flusso di lavoro

  • Memorizza nella cache i risultati : il risultato viene archiviato da (step_name, call_index). In caso di riprendimento o ripristino del checkpoint HITL, un passaggio completato restituisce immediatamente il risultato salvato anziché eseguire di nuovo.
  • Genera eventi : executor_invoked / executor_completed / executor_failed vengono generati per l'osservabilità. In caso di riscontri nella cache, executor_bypassed viene invece generato.
  • Salva i checkpoint : se il flusso di lavoro ha checkpoint_storage, viene salvato un checkpoint al termine di ogni passaggio.
  • Inietta WorkflowRunContext — se la funzione step dichiara un ctx: WorkflowRunContext parametro, il contesto attivo viene inserito automaticamente.

All'esterno di un flusso di lavoro in esecuzione, @step è trasparente: la funzione si comporta in modo identico alla versione non decorata, rendendola completamente testabile in isolamento.

Quando usare @step

Usare @step sulle funzioni che sono costose da eseguire di nuovo: chiamate di agenti, richieste API esterne o qualsiasi operazione in cui la ripetizione dell'esecuzione al curriculum sarebbe costosa o ha effetti collaterali. Le funzioni semplici (senza @step) funzionano ancora all'interno @workflowdi ; vengono semplicemente eseguite nuovamente quando il flusso di lavoro riprende.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step accetta anche un name parametro:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Vedere python/samples/03-workflows/functional/steps_and_checkpointing.py per un esempio completo.

WorkflowRunContext

WorkflowRunContext (alias breve: RunContext) è il contesto di esecuzione iniettato nelle funzioni di flusso di lavoro e di passo. È necessario solo quando si usa HITL, stato chiave-valore o eventi personalizzati.

Importarlo da agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Interazione umana nel processo

ctx.request_info() sospende il flusso di lavoro per attendere l'input esterno:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parametri:

Parametro Type Description
request_data Any Payload che descrive l'input necessario (dict, modello Pydantic, stringa, ...).
response_type type Tipo Python previsto della risposta.
request_id str | None Identificatore stabile per questa richiesta. Se omesso, viene generato un UUID casuale.

Semantica di riproduzione: Alla prima esecuzione request_info() genera un segnale interno (mai visibile al tuo codice) che sospende il workflow. Il chiamante riceve un WorkflowRunResult con get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Riprendere chiamando .run(responses={request_id: value}) : il flusso di lavoro viene eseguito nuovamente dall'inizio e request_info() restituisce immediatamente il valore specificato.

Le funzioni decorate con @step, che sono state eseguite prima della sospensione, restituiscono i loro risultati memorizzati nella cache al riavvio invece di essere rieseguite.

Gestione della risposta:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Vedere python/samples/03-workflows/functional/hitl_review.py per un esempio completo.

ctx.request_info() è supportato anche all'interno delle funzioni @step.

ctx.add_event() — Eventi personalizzati

Usare ctx.add_event() per generare eventi specifici dell'applicazione insieme agli eventi del ciclo di vita del framework. Per informazioni dettagliate ed esempi, vedere Creazione di eventi personalizzati.

ctx.get_state() / ctx.set_state() — Stato chiave/valore

Usare ctx.get_state() e ctx.set_state() per archiviare i valori che vengono mantenuti tra le interruzioni HITL e sono inclusi nei checkpoint. Per informazioni dettagliate, vedere Stato del flusso di lavoro.

I valori di stato devono essere serializzabili in JSON quando è configurata l'archiviazione del checkpoint.

ctx.is_streaming()

Restituisce True quando l'esecuzione corrente è stata avviata con stream=True. Funzioni di transizione utili che vogliono adattare il loro comportamento in base alla modalità di streaming.

get_run_context()

Recupera l'oggetto attivo WorkflowRunContext da qualsiasi posizione all'interno di un flusso di lavoro in esecuzione, utile nelle funzioni helper che non dichiarano un ctx parametro:

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Restituisce None quando viene chiamato all'esterno di un flusso di lavoro in esecuzione.

Parallelismo con asyncio.gather

Usare la concorrenza standard di Python per operazioni di fan-out/fan-in, senza bisogno di primitive del framework.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather funziona anche quando le funzioni sono decorate con @step.

Vedere python/samples/03-workflows/functional/parallel_pipeline.py per un esempio completo.

Chiamata di agenti all'interno dei flussi di lavoro

Le chiamate di Agent funzionano come chiamate di funzione normali all'interno di @workflow.

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Aggiungere @step alle funzioni di chiamata tramite agente quando si desidera che i risultati siano memorizzati nella cache tra riprese HITL o ripristini dei checkpoint:

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Vedere python/samples/03-workflows/functional/agent_integration.py per un esempio completo.

.as_agent() — Uso di un flusso di lavoro come agente

Avvolgere un oggetto FunctionalWorkflow come oggetto compatibile con l'agente utilizzando .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() restituisce un FunctionalWorkflowAgent oggetto che espone la stessa run() interfaccia di altri oggetti agente, rendendo componibili flussi di lavoro funzionali con qualsiasi sistema che accetta gli agenti.

Parametro Type Description
name str | None Mostra il nome dell'agente. Il nome predefinito è quello del flusso di lavoro.

Vedere python/samples/03-workflows/functional/agent_integration.py per un esempio.

Samples

Gli esempi eseguibili sono disponibili nelle cartelle di esempio seguenti:

Passaggi successivi

Argomenti correlati:

L'API del flusso di lavoro funzionale non è attualmente disponibile per C#.