Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Avvertimento
L'API del flusso di lavoro funzionale è sperimentale e soggetta a modifiche o rimozione nelle versioni future senza preavviso.
L'API del flusso di lavoro funzionale consente di scrivere flussi di lavoro come funzioni asincrone Python semplici. Invece di definire classi executor, configurare i collegamenti e usare WorkflowBuilder, si applica un decoratore a una funzione async con @workflow e si utilizza il flusso di controllo nativo di Python — if/else, cicli for, asyncio.gather — per esprimere la logica.
Per un confronto affiancato con l'API graph, vedere API del flusso di lavoro nella panoramica dei flussi di lavoro.
@workflow decoratore
Applicare @workflow a una async funzione per convertirla in un FunctionalWorkflow oggetto :
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
L'elemento @workflow decorator supporta una forma parametrizzabile con argomenti facoltativi:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow Parametri
| Parametro | Type | Description |
|---|---|---|
name |
str | None |
Nome visualizzato per il flusso di lavoro. Imposta il valore predefinito della funzione __name__. |
description |
str | None |
Descrizione leggibile facoltativa. |
checkpoint_storage |
CheckpointStorage | None |
Archiviazione predefinita per rendere persistenti i risultati dei passaggi tra le esecuzioni. Può essere sottoposto a override per ogni chiamata in run(). |
Firma della funzione flusso di lavoro
Il primo parametro della funzione del flusso di lavoro riceve l'input passato a .run(). Aggiungere un ctx: WorkflowRunContext parametro solo quando è necessario HITL, lo stato key/value o eventi personalizzati — altrimenti è facoltativo:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext viene rilevato prima dall'annotazione di tipo, quindi dal nome del parametro ctx, quindi funzionano sia ctx: WorkflowRunContext che un parametro semplice ctx.
Esecuzione di un flusso di lavoro
Chiamare .run() sull'oggetto FunctionalWorkflow restituito da @workflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() Parametri
| Parametro | Type | Description |
|---|---|---|
message |
Any | None |
Input passato alla funzione del flusso di lavoro come primo argomento. |
stream |
bool |
Se True, fornisce un ResponseStream che restituisce oggetti WorkflowEvent. Il valore predefinito è False. |
responses |
dict[str, Any] | None |
Risposte HITL indicizzate da request_id. Utilizzato per riprendere un flusso di lavoro sospeso. |
checkpoint_id |
str | None |
Checkpoint da cui eseguire il ripristino.
checkpoint_storage deve essere impostato. |
checkpoint_storage |
CheckpointStorage | None |
Esegue l'override del set di archiviazione predefinito nell'elemento Decorator per questa esecuzione. |
include_status_events |
bool |
Includere eventi di modifica dello stato nel risultato non di streaming. |
È necessario specificare esattamente uno di message, responseso checkpoint_id per ogni chiamata.
WorkflowRunResult
run() (non in streaming) restituisce un oggetto WorkflowRunResult. Metodi chiave:
| Metodo/proprietà | Returns | Description |
|---|---|---|
.text |
str |
Primo output come stringa. Stringa vuota se non viene restituito alcun output di stringa. |
.get_outputs() |
list[Any] |
Tutti gli output generati dal flusso di lavoro. |
.get_final_state() |
WorkflowRunState |
Stato di esecuzione finale (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Richieste HITL in sospeso quando lo stato è IDLE_WITH_PENDING_REQUESTS. |
Trasmissione in diretta
Passare stream=True per ricevere gli eventi man mano che vengono generati:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Vedere python/samples/03-workflows/functional/basic_streaming_pipeline.py per un esempio completo.
@step decoratore
@step è un decoratore opzionale che aggiunge la memorizzazione nella cache dei risultati, l'emissione di eventi e la creazione di punti di controllo alle singole funzioni asincrone.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Operazioni @step all'interno di un flusso di lavoro
-
Memorizza nella cache i risultati : il risultato viene archiviato da
(step_name, call_index). In caso di riprendimento o ripristino del checkpoint HITL, un passaggio completato restituisce immediatamente il risultato salvato anziché eseguire di nuovo. -
Genera eventi :
executor_invoked/executor_completed/executor_failedvengono generati per l'osservabilità. In caso di riscontri nella cache,executor_bypassedviene invece generato. -
Salva i checkpoint : se il flusso di lavoro ha
checkpoint_storage, viene salvato un checkpoint al termine di ogni passaggio. -
Inietta
WorkflowRunContext— se la funzione step dichiara unctx: WorkflowRunContextparametro, il contesto attivo viene inserito automaticamente.
All'esterno di un flusso di lavoro in esecuzione, @step è trasparente: la funzione si comporta in modo identico alla versione non decorata, rendendola completamente testabile in isolamento.
Quando usare @step
Usare @step sulle funzioni che sono costose da eseguire di nuovo: chiamate di agenti, richieste API esterne o qualsiasi operazione in cui la ripetizione dell'esecuzione al curriculum sarebbe costosa o ha effetti collaterali. Le funzioni semplici (senza @step) funzionano ancora all'interno @workflowdi ; vengono semplicemente eseguite nuovamente quando il flusso di lavoro riprende.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step accetta anche un name parametro:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Vedere python/samples/03-workflows/functional/steps_and_checkpointing.py per un esempio completo.
WorkflowRunContext
WorkflowRunContext (alias breve: RunContext) è il contesto di esecuzione iniettato nelle funzioni di flusso di lavoro e di passo. È necessario solo quando si usa HITL, stato chiave-valore o eventi personalizzati.
Importarlo da agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Interazione umana nel processo
ctx.request_info() sospende il flusso di lavoro per attendere l'input esterno:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parametri:
| Parametro | Type | Description |
|---|---|---|
request_data |
Any |
Payload che descrive l'input necessario (dict, modello Pydantic, stringa, ...). |
response_type |
type |
Tipo Python previsto della risposta. |
request_id |
str | None |
Identificatore stabile per questa richiesta. Se omesso, viene generato un UUID casuale. |
Semantica di riproduzione: Alla prima esecuzione request_info() genera un segnale interno (mai visibile al tuo codice) che sospende il workflow. Il chiamante riceve un WorkflowRunResult con get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Riprendere chiamando .run(responses={request_id: value}) : il flusso di lavoro viene eseguito nuovamente dall'inizio e request_info() restituisce immediatamente il valore specificato.
Le funzioni decorate con @step, che sono state eseguite prima della sospensione, restituiscono i loro risultati memorizzati nella cache al riavvio invece di essere rieseguite.
Gestione della risposta:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Vedere python/samples/03-workflows/functional/hitl_review.py per un esempio completo.
ctx.request_info() è supportato anche all'interno delle funzioni @step.
ctx.add_event() — Eventi personalizzati
Usare ctx.add_event() per generare eventi specifici dell'applicazione insieme agli eventi del ciclo di vita del framework. Per informazioni dettagliate ed esempi, vedere Creazione di eventi personalizzati.
ctx.get_state()
/
ctx.set_state() — Stato chiave/valore
Usare ctx.get_state() e ctx.set_state() per archiviare i valori che vengono mantenuti tra le interruzioni HITL e sono inclusi nei checkpoint. Per informazioni dettagliate, vedere Stato del flusso di lavoro.
I valori di stato devono essere serializzabili in JSON quando è configurata l'archiviazione del checkpoint.
ctx.is_streaming()
Restituisce True quando l'esecuzione corrente è stata avviata con stream=True. Funzioni di transizione utili che vogliono adattare il loro comportamento in base alla modalità di streaming.
get_run_context()
Recupera l'oggetto attivo WorkflowRunContext da qualsiasi posizione all'interno di un flusso di lavoro in esecuzione, utile nelle funzioni helper che non dichiarano un ctx parametro:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Restituisce None quando viene chiamato all'esterno di un flusso di lavoro in esecuzione.
Parallelismo con asyncio.gather
Usare la concorrenza standard di Python per operazioni di fan-out/fan-in, senza bisogno di primitive del framework.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather funziona anche quando le funzioni sono decorate con @step.
Vedere python/samples/03-workflows/functional/parallel_pipeline.py per un esempio completo.
Chiamata di agenti all'interno dei flussi di lavoro
Le chiamate di Agent funzionano come chiamate di funzione normali all'interno di @workflow.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Aggiungere @step alle funzioni di chiamata tramite agente quando si desidera che i risultati siano memorizzati nella cache tra riprese HITL o ripristini dei checkpoint:
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Vedere python/samples/03-workflows/functional/agent_integration.py per un esempio completo.
.as_agent() — Uso di un flusso di lavoro come agente
Avvolgere un oggetto FunctionalWorkflow come oggetto compatibile con l'agente utilizzando .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() restituisce un FunctionalWorkflowAgent oggetto che espone la stessa run() interfaccia di altri oggetti agente, rendendo componibili flussi di lavoro funzionali con qualsiasi sistema che accetta gli agenti.
| Parametro | Type | Description |
|---|---|---|
name |
str | None |
Mostra il nome dell'agente. Il nome predefinito è quello del flusso di lavoro. |
Vedere python/samples/03-workflows/functional/agent_integration.py per un esempio.
Samples
Gli esempi eseguibili sono disponibili nelle cartelle di esempio seguenti:
-
python/samples/01-get-started/— esempi introduttivi@workflow -
python/samples/03-workflows/functional/— esempi di flussi di lavoro funzionali con funzionalità complete
Passaggi successivi
Argomenti correlati:
- Executors — unità di elaborazione nell'API basata su grafo
- Human-in-the-loop - HITL nei flussi di lavoro basati su grafo
- Checkpoint : archiviazione e ripresa dei checkpoint
- Eventi : tipi di eventi del flusso di lavoro
- Uso di flussi di lavoro come agenti
L'API del flusso di lavoro funzionale non è attualmente disponibile per C#.