クイック スタート: Python Durable Functions アプリを作成する

Azure Functions の機能である Durable Functions を使用して、Pythonでステートフル なサーバーレス ワークフローを記述します。 このクイック スタートでは、2 つの一般的なオーケストレーション パターンを示すサンプル アプリを複製して実行します。

  • 関数チェーン: アクティビティを順番に呼び出します (東京→シアトル→ロンドン)。
  • ファンアウト/ファンイン: 5 つの都市間でアクティビティを並列で呼び出し、結果を集計します。

最終的には、 Durable Task Scheduler エミュレーターを使用して両方のオーケストレーションがローカルで実行され、ダッシュボードでその状態を表示できるようになります。

  • Hello Cities サンプル プロジェクトを複製して準備します。
  • ローカル開発用に Durable Task Scheduler エミュレーターと Azurite を設定します。
  • 関数アプリを実行し、両方のオーケストレーションをトリガーします。
  • Durable Task Scheduler ダッシュボードでオーケストレーションの状態と出力を確認します。

Prerequisites

Durable Task Scheduler エミュレーターを設定する

Durable Task Scheduler エミュレーターは、Azure サブスクリプションなしでオーケストレーションをテストできるように、ローカル開発環境を提供します。 Functions ホストには、ローカル ストレージ用の Azurite も必要です。

両方のコンテナーを起動します。

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Tip

エミュレーターが実行されたら、 http://localhost:8082 の Durable Task Scheduler ダッシュボードにアクセスしてオーケストレーションを監視できます。

クイック スタート サンプルを実行する

  1. Hello Cities サンプル ディレクトリに移動します。

    cd samples/durable-functions/python/hello-cities
    
  2. 仮想環境を作成し、依存関係をインストールします。

    python -m venv .venv
    .venv\Scripts\activate
    pip install -r requirements.txt
    
  3. local.settings.json ファイルに次の構成が含まれていることを確認します。

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. 関数アプリを起動します。

    func start
    
  5. 別のターミナルで、関数チェーンのオーケストレーションを起動します:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    応答には、オーケストレーション インスタンスの状態 URL が含まれています。 statusQueryGetUri値をコピーして実行し、結果を確認します。

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. ファンアウト/ファンイン オーケストレーションをトリガーします:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

想定される出力

POST 要求は、状態 URL を含む JSON 応答を返します。 例えば次が挙げられます。

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

statusQueryGetUriクエリを実行し、オーケストレーションのruntimeStatusCompletedされると、output フィールドにあいさつの結果が表示されます。 連鎖オーケストレーションは次を返します:

{
  "name": "chaining_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

ファンアウト/ファンイン オーケストレーションは次の結果を返します:

{
  "name": "fan_out_fan_in_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Tip

runtimeStatus RunningまたはPendingが表示される場合は、しばらく待ってから、statusQueryGetUriをもう一度クエリします。

http://localhost:8082で Durable Task Scheduler ダッシュボードを開き、オーケストレーションの状態と実行履歴を表示します。

コードを理解する

このサンプルでは、Python v2 プログラミング モデルとデコレーターを使用します。このモデルでは、すべての関数が 1 つのファイル (function_app.py) で定義されています。

アクティビティ関数

say_hello アクティビティは市区町村名を受け取り、あいさつ文を返します。

@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    """Activity function that returns a greeting for a city."""
    logging.info(f"Saying hello to {city}.")
    return f"Hello {city}!"

オーケストレーター関数

チェイニング オーケストレーターは、3 つの都市に対して say_hello を順番に呼び出します。

@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
    """Function chaining orchestration: calls activities sequentially."""
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

ファンアウト/ファンイン オーケストレーターは、アクティビティを並列にスケジュールします。

@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
    """Fan-out/Fan-in orchestration: calls activities in parallel."""
    cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]

    # Fan-out: schedule all activities in parallel
    parallel_tasks = []
    for city in cities:
        task = context.call_activity("say_hello", city)
        parallel_tasks.append(task)

    # Fan-in: wait for all to complete
    results = yield context.task_all(parallel_tasks)
    return results

クライアント関数

HTTP によってトリガーされるクライアント関数は、各オーケストレーションを開始します。 たとえば、チェーン スターターは次のようになります。

@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
    """HTTP trigger to start the function chaining orchestration."""
    instance_id = await client.start_new("chaining_orchestration")
    logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

コンフィギュレーション

このサンプルでは、ストレージ バックエンドとして Durable Task Scheduler エミュレーターを使用します。 これは、 host.jsonで構成されます。

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

リソースをクリーンアップする

完了したら、エミュレーター コンテナーを停止します。

docker stop dtsemulator azurite && docker rm dtsemulator azurite

Python仮想環境を非アクティブ化するには:

deactivate

次のステップ