Funktionale Workflow-API

Warnung

Die funktionale Workflow-API ist experimentell und kann in zukünftigen Versionen ohne vorherige Ankündigung geändert oder entfernt werden.

Mit der funktionalen Workflow-API können Sie Workflows als einfache Python asynchrone Funktionen schreiben. Anstatt Ausführungsklassen zu definieren, Kanten zu verknüpfen und mit WorkflowBuilder zu dekorieren, versehen Sie eine async-Funktion mit @workflow und nutzen den nativem Python-Steuerfluss — if/else, for-Schleifen, asyncio.gather — um Ihre Logik auszudrücken.

Einen parallelen Vergleich mit der Graph-API finden Sie unter Workflow-APIs in der Übersicht über Workflows.

@workflow Dekorateur

Wenden Sie @workflow auf eine async Funktion an, um sie in ein FunctionalWorkflow Objekt zu konvertieren:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

Der @workflow Dekorateur unterstützt ein parametrisiertes Formular mit optionalen Argumenten:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parameter

Parameter Type Beschreibung
name str | None Anzeigename für den Workflow. Standardmäßig wird der Funktion __name__ verwendet.
description str | None Optionale menschlich lesbare Beschreibung.
checkpoint_storage CheckpointStorage | None Standardspeicher für beibehaltene Schrittergebnisse zwischen Denläufen. Kann pro Aufruf außer Kraft gesetzt werden.run()

Signatur der Workflowfunktion

Der erste Parameter der Workflowfunktion empfängt die Eingabe, die an .run() übergeben wurde. Fügen Sie einen ctx: WorkflowRunContext Parameter nur hinzu, wenn Sie HITL, Key/Value-Status oder benutzerdefinierte Ereignisse benötigen . Andernfalls ist er optional:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext wird zuerst durch Typannotation, dann durch den Parameternamen ctx erkannt, sodass sowohl ctx: WorkflowRunContext als auch ein einfaches ctx Parameter funktionieren.

Ausführen eines Workflows

Rufen Sie .run() auf dem von FunctionalWorkflow zurückgegebenen @workflow-Objekt auf.

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parameter

Parameter Type Beschreibung
message Any | None Eingabe, die als erstes Argument an die Workflowfunktion übergeben wird.
stream bool Wenn True, wird ein ResponseStream zurückgegeben, das WorkflowEvent Objekte liefert. Wird standardmäßig auf False festgelegt.
responses dict[str, Any] | None HITL-Antworten, die durch request_id zugeordnet sind. Wird verwendet, um einen angehaltenen Workflow fortzusetzen.
checkpoint_id str | None Prüfpunkt, der wiederhergestellt werden soll. Erfordert, dass checkpoint_storage festgelegt wird.
checkpoint_storage CheckpointStorage | None Überschreibt den Standardspeicher, der für diesen Lauf auf dem Decorator festgelegt ist.
include_status_events bool Schließen Sie Statusänderungsereignisse in das Nicht-Streaming-Ergebnis ein.

Genau einer von message, responsesoder checkpoint_id muss pro Anruf bereitgestellt werden.

WorkflowRunResult

run() (Nicht-Streaming) gibt ein WorkflowRunResult. Schlüsselmethoden:

Methode / Eigenschaft Returns Beschreibung
.text str Erste Ausgabe als Zeichenfolge. Leere Zeichenfolge, wenn keine Zeichenfolge ausgegeben wird.
.get_outputs() list[Any] Alle Ausgaben, die vom Workflow ausgegeben werden.
.get_final_state() WorkflowRunState Endausführungszustand (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Ausstehende HITL-Anforderungen, wenn der Zustand ist IDLE_WITH_PENDING_REQUESTS.

Streamen

Übergeben Sie stream=True, um Ereignisse zu empfangen, sobald sie erstellt werden.

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Ein vollständiges Beispiel finden Sie unter python/samples/03-workflows/functional/basic_streaming_pipeline.py.

@step Dekorateur

@step ist ein Opt-In-Dekoror, der den einzelnen asynchronen Funktionen Ergebniszwischenspeicherung, Ereignisemissionen und Prüfpunkte pro Schritt hinzufügt:

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Was @step innerhalb eines Workflows macht

  • Caches Ergebnisse – das Ergebnis wird von (step_name, call_index) gespeichert. Bei der Fortsetzung von HITL oder der Wiederherstellung eines Prüfpunkts gibt ein abgeschlossener Schritt sein gespeichertes Ergebnis sofort zurück, anstatt ihn erneut auszuführen.
  • Gibt Ereignisse ausexecutor_invoked / executor_completed / executor_failed werden zur Observierbarkeit ausgegeben. Stattdessen wird bei einem Cachetreffer executor_bypassed ausgegeben.
  • Speichert Prüfpunkte – wenn der Workflow über checkpoint_storage verfügt, wird ein Prüfpunkt nach Abschluss jedes Schritts gespeichert.
  • Injiziert WorkflowRunContext — Wenn die Schrittfunktion einen ctx: WorkflowRunContext Parameter deklariert, wird der aktive Kontext automatisch eingefügt.

Außerhalb eines laufenden Workflows ist @step transparent – die Funktion verhält sich identisch wie in ihrer unveränderten Version, wodurch sie vollständig isoliert getestet werden kann.

Empfohlene Verwendung von @step

Verwenden Sie @step bei Funktionen, die für die erneute Ausführung teuer sind: Agentaufrufe, externe API-Anforderungen oder alle Vorgänge, bei denen die erneute Ausführung nach der Wiederaufnahme kostspielig wäre oder Nebenwirkungen haben würde. Einfache Funktionen (ohne @step) funktionieren weiterhin innerhalb @workflow; sie werden einfach erneut ausgeführt, wenn der Workflow fortgesetzt wird.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step akzeptiert auch einen name Parameter:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Ein vollständiges Beispiel finden Sie unter python/samples/03-workflows/functional/steps_and_checkpointing.py.

WorkflowRunContext

WorkflowRunContext (kurzer Alias: RunContext) ist der Ausführungskontext, der in Workflow- und Schrittfunktionen eingefügt wird. Sie benötigen es nur, wenn Sie HITL, Key/Value-Status oder benutzerdefinierte Ereignisse verwenden.

Importieren sie aus agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Mensch in der Steuerungsschleife

ctx.request_info() hält den Workflow an, um auf externe Eingaben zu warten:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parameter:

Parameter Type Beschreibung
request_data Any Nutzlast, die beschreibt, welche Eingabe erforderlich ist (Diktieren, Pydantisches Modell, Zeichenfolge, ...).
response_type type Erwarteter Python-Typ der Antwort.
request_id str | None Stabiler Bezeichner für diese Anfrage. Eine zufällige UUID wird generiert, wenn sie weggelassen wird.

Wiedergabesemantik: Bei der ersten Ausführung wird ein internes Signal ausgelöst (nie sichtbar für Ihren Code), das den Workflow anhält. Der Anrufer empfängt einen WorkflowRunResult mit get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Fortsetzen durch Aufrufen .run(responses={request_id: value}) – der Workflow wird von oben erneut ausgeführt und request_info() gibt den bereitgestellten Wert sofort zurück.

@step-versehene Funktionen, die vor der Aussetzung ausgeführt wurden, geben ihre zwischengespeicherten Ergebnisse beim Fortsetzen zurück, anstatt erneut auszuführen.

Behandeln der Antwort:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Ein vollständiges Beispiel finden Sie unter python/samples/03-workflows/functional/hitl_review.py.

ctx.request_info() wird auch innerhalb von @step Funktionen unterstützt.

ctx.add_event() — Benutzerdefinierte Ereignisse

Verwenden Sie ctx.add_event(), um anwendungsspezifische Ereignisse zusammen mit Framework-Lebenszyklusereignissen auszulösen. Ausführliche Informationen und Beispiele finden Sie unter "Mittieren von benutzerdefinierten Ereignissen".

ctx.get_state() / ctx.set_state() — Schlüssel-/Wertstatus

Verwenden Sie ctx.get_state() und ctx.set_state() zum Speichern von Werten, die über HITL-Unterbrechungen hinweg bestehen und in Prüfpunkten enthalten sind. Ausführliche Informationen finden Sie im Workflowstatus.

Zustandswerte müssen JSON-serialisierbar sein, wenn der Prüfpunktspeicher konfiguriert ist.

ctx.is_streaming()

Gibt True zurück, wenn die aktuelle Ausführung mit stream=True gestartet wurde. Nützlich für Schrittfunktionen, die ihr Verhalten auf Basis des Streamingmodus anpassen möchten.

get_run_context()

Ruft das aktive WorkflowRunContext von überall innerhalb eines ausgeführten Workflows ab – nützlich in Hilfsfunktionen, die keinen ctx-Parameter deklarieren.

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Gibt None zurück, wenn außerhalb eines laufenden Workflows aufgerufen.

Parallelität mit asyncio.gather

Verwenden Sie standardmäßige Python-Nebenläufigkeit für Fan-out/Fan-In — keine Framework-Primitives erforderlich.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather funktioniert auch, wenn die Funktionen mit @stepversehen sind.

Ein vollständiges Beispiel finden Sie unter python/samples/03-workflows/functional/parallel_pipeline.py.

Aufrufen von Agents innerhalb von Workflows

Agent-Aufrufe funktionieren als einfache Funktionsaufrufe in @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Fügen Sie @step zu Agentanruffunktionen hinzu, wenn ihre Ergebnisse zwischen HITL-Resume oder Prüfpunktwiederherstellungen zwischengespeichert werden sollen.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Ein vollständiges Beispiel finden Sie unter python/samples/03-workflows/functional/agent_integration.py.

.as_agent() — Verwenden eines Workflows als Agent

FunctionalWorkflow als agentenkompatibles Objekt mit .as_agent() umschließen:

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() gibt eine FunctionalWorkflowAgent Schnittstelle zurück, die die gleiche run() Schnittstelle wie andere Agentobjekte verfügbar macht, wodurch funktionale Workflows mit jedem System komponiert werden können, das Agents akzeptiert.

Parameter Type Beschreibung
name str | None Anzeigename für den Agenten. Standardmäßig wird der Workflowname verwendet.

Ein Beispiel finden Sie unter python/samples/03-workflows/functional/agent_integration.py.

Beispiele

Ausführbare Beispiele befinden sich in den folgenden Beispielordnern:

Nächste Schritte

Verwandte Themen:

Die funktionale Workflow-API ist zurzeit nicht für C# verfügbar.