次の方法で共有


非同期要求-応答パターン

バックエンド処理を非同期的に実行する必要があるが、フロントエンドが明確な応答を必要とする場合は、フロントエンド ホストからバックエンド処理を切り離します。

コンテキストと問題

最新のアプリケーション開発では、クライアント アプリケーションは多くの場合、ビジネス ロジックと作成機能を提供するためにリモート API に依存しています。 多くのアプリケーションは Web ブラウザーでコードを実行し、他の環境でもクライアント コードをホストしています。 API は、アプリケーションに直接関連している場合や、外部サービスから共有サービスとして動作する場合があります。 ほとんどの API 呼び出しでは、HTTP または HTTPS を使用し、REST セマンティクスに従います。

ほとんどの場合、クライアント アプリケーションの API は約 100 ミリ秒 (ミリ秒) 以下で応答します。 多くの要因が応答の待機時間に影響する可能性があります。

  • アプリケーションのホスティング スタック
  • セキュリティ コンポーネント
  • 呼び出し元とバックエンドの相対的な地理的位置
  • ネットワーク インフラストラクチャ
  • 現在の負荷
  • 要求ペイロードのサイズ
  • 処理キューの長さ
  • バックエンドが要求を処理する時間

これらの要因により、応答に待機時間が追加される可能性があります。 バックエンドをスケールアウトすることで、いくつかの要因を軽減できます。 ネットワーク インフラストラクチャなどのその他の要因は、アプリケーション開発者の制御外です。 ほとんどの API は、応答が同じ接続経由で返されるのに十分な速さで応答します。 アプリケーション コードは、非同期処理の外観を提供するために、非ブロッキングの方法で同期 API 呼び出しを行うことができます。 入出力 (I/O) バインド操作には、この方法をお勧めします。

一部のシナリオでは、バックエンドは実行時間が長く、数秒かかる作業を行います。 その他のシナリオでは、バックエンドは長時間実行されるバックグラウンド作業を数分または長期間実行します。 このような場合は、応答を送信する前に作業が完了するまで待つ必要はありません。 このような状況では、同期要求/応答パターンに問題が発生する可能性があります。 バックエンド処理の設計に関するガイダンスについては、「 バックグラウンド ジョブ」を参照してください。

一部のアーキテクチャでは、メッセージ ブローカーを使用して要求ステージと応答ステージを分離することによって、この問題を解決しています。 多くのシステムでは、 Queue-Based 負荷平準化パターンによってこの分離が実現されます。 この分離により、クライアント プロセスとバックエンド API を個別にスケーリングできます。 また、クライアントが成功通知を必要とする場合は、そのステップも非同期にする必要があるため、複雑さが増します。

クライアント アプリケーションに適用される同じ考慮事項の多くは、マイクロサービス アーキテクチャなど、分散システムのサーバー間 REST API 呼び出しにも適用されます。

ソリューション

この問題の 1 つの解決策は、HTTP ポーリングを使用することです。 コールバック エンドポイントが使用できない場合、または実行時間の長い接続が複雑すぎる場合は、クライアント側のコードにポーリングが適しています。 コールバックが可能な場合でも、必要な追加のライブラリとサービスによって複雑さが増す可能性があります。

次の手順では、ソリューションについて説明します。

  • クライアント アプリケーションは API を同期的に呼び出して、バックエンドで実行時間の長い操作をトリガーします。

  • API は、可能な限りすばやく同期的に応答します。 HTTP 202 (Accepted) 状態コードを返して、処理要求を受信したことを確認します。

    API は、実行時間の長いプロセスを開始する前に、要求と実行されるアクションを検証する必要があります。 要求が有効でない場合は、HTTP 400 (無効な要求) などのエラー コードですぐに応答します。

  • 応答には、クライアントがポーリングして実行時間の長い操作の結果を確認できるエンドポイントを指す場所参照が含まれます。

  • API は、メッセージ キューなどの別のコンポーネントに処理をオフロードします。

  • 状態エンドポイントへの呼び出しが成功するたびに、エンドポイントは HTTP 200 (OK) を返します。 作業の進行中、状態エンドポイントは、その状態を示すリソースを返します。 状態の応答本文には、クライアントが操作の現在の状態を理解するのに十分な情報が含まれている必要があります。

    作業が完了すると、状態エンドポイントは完了を示すリソースを返すか、別のリソース URL にリダイレクトします。 たとえば、非同期操作で新しいリソースが作成された場合、状態エンドポイントはそのリソースの URL にリダイレクトされます。

次の図は、一般的なフローを示しています。

非同期 HTTP 要求の要求と応答フローを示す図。

  1. クライアントは要求を送信し、HTTP 202 (Accepted) 応答を受信します。

  2. クライアントは、HTTP GET 要求を状態エンドポイントに送信します。 作業がまだ保留中のため、この呼び出しは HTTP 200 を返します。

  3. ある時点で作業が完了し、状態エンドポイントが HTTP 303 (その他を参照) を返してリソースにリダイレクトします。

  4. クライアントは、指定された URL にあるリソースをフェッチします。

問題と考慮事項

このパターンを実装する方法を決定するときは、次の点を考慮してください。

  • HTTP 経由でこのパターンを実装する方法は複数存在し、アップストリーム サービスでは常に同じセマンティクスが使用されるわけではありません。 たとえば、一部の実装では個別の状態エンドポイントが使用されません。 代わりに、クライアントはターゲット リソース URL を直接ポーリングし、リソースが作成されるまで HTTP 404 (Not Found) を受け取ります。 リソースがまだ存在しないため、この応答は理にかなっています。 ただし、無効な要求 ID に対しても 404 が返される場合、この方法はあいまいになる可能性があります。 このパターンで説明されているように、状態本文を持つ HTTP 200 を返す専用の状態エンドポイントは、そのあいまいさを回避します。

  • HTTP 202 応答は、クライアントがポーリングする場所と頻度を示します。 次のヘッダーを含める必要があります。

    ヘッダー 説明 メモ
    Location クライアントが応答状態をポーリングする URL この URL には、Shared Access Signature トークンを指定できます。 バレット キー パターンは、この場所でアクセス制御が必要な場合に適切に機能します。 このパターンは、応答ポーリングを別のバックエンドに移動する必要がある場合にも適用されます。
    Retry-After 処理が完了するタイミングの見積もり このヘッダーは、ポーリング クライアントがバックエンドに送信する要求が多くなりすぎないように設計されています。

    この応答を設計するときは、予想されるクライアントの動作を検討してください。 制御するクライアントは、これらの応答値に正確に従うことができます。 コードなしのツールやAzure Logic Appsなどのローコード ツールを使用して構築されたクライアントなど、他のユーザーが作成するクライアントは、HTTP 202 に対して独自の処理を適用できます。

  • 状態エンドポイントの応答に次のフィールドを含めてみてください。

    フィールド 説明 メモ
    status 操作の現在の状態として保留中、実行中成功失敗取り消しがあります。 一貫性のある、文書化されたターミナル値と非ターミナル値のセットを使用します。
    createdAt 操作が受け入れられた時刻。 クライアントが古い操作または破棄された操作を検出するのに役立ちます。
    lastUpdatedAt 状態が最後に更新された時刻。 クライアントが停止している操作と、アクティブに進行している操作を区別できるようにします。
    percentComplete オプションの進行状況インジケーター。 バックエンドが進行状況を意味のある方法で見積もることができる場合に便利です。
    error 状態が Failed の場合の構造化エラー オブジェクト。 一貫性のために RFC 9457 形式を使用することを検討してください。
  • 使用する基になるサービスによっては、処理プロキシを使用して応答ヘッダーまたはペイロードを調整することが必要になる場合があります。

  • 完了後に状態エンドポイントがリダイレクトされる場合は、 HTTP 303 (その他を参照) を使用します。 303 は、元の要求方法に関係なく、リダイレクト URL に GET 要求を発行するようにクライアントに指示します。 クライアントが元の操作を再送信せず、個別の結果リソースを取得しているため、このパターンの正しいセマンティックは、この動作です。 HTTP 302 (Found) では、メソッドの変更は保証されません。一部のクライアントはリダイレクト時に元のメソッドを再生します。これにより、POST 要求の重複など、意図しない副作用が発生する可能性があります。

  • サーバーが要求を正常に処理すると、 Location ヘッダーで指定されたリソースは、200、201 (作成)、204 (コンテンツなし) などの HTTP 状態コードを返します。

  • 処理中にエラーが発生した場合は、 Location ヘッダーが指定したリソース URL でエラーを保持し、エラーに一致する 4xx 状態コードをそのリソースから返します。 RFC 9457 (HTTP API の問題の詳細) などの構造化エラー形式を使用して、クライアントがプログラムでエラーを解析して処理できるようにします。

  • 状態リソースと格納された結果は、ストレージとコンピューティングを消費します。 適切な期間が経過するとデータを削除するように、データ保持ポリシーを定義し、状態応答のExpiresヘッダーを使用してクライアントに保持期間を通知することを検討します。

  • ソリューションはすべて同じ方法でこのパターンを実装するわけではありません。また、一部のサービスには追加ヘッダーまたは代替ヘッダーが含まれています。 たとえば、Azure Resource Managerはこのパターンの変更されたバリアントを使用します。 詳細については、「Resource Manager非同期操作を参照してください。

  • レガシ クライアントでは、このパターンをサポートしていない可能性があります。 その場合は、元のクライアントからの非同期処理を隠すために、非同期 API にファサードを配置する必要がある場合があります。 たとえば、Logic Apps ではこのパターンがネイティブにサポートされており、非同期 API と同期呼び出しを行うクライアントの間の統合レイヤーとして使用できます。 詳細については、「 Azure Logic Apps での非同期の要求/応答動作」を参照してください。

  • 一部のシナリオでは、クライアントが長時間実行される要求をキャンセルする方法を提供する必要がある場合があります。 その場合は、状態エンドポイント リソースに対して DELETE 操作を公開します。 この要求では、キャンセル命令をバックエンド処理コンポーネントに転送する必要があります。 バックエンドが取り消しを処理した後、取り消された状態を反映するように状態リソースを更新する必要があります。 このプロセスは、不完全な作業がリソースを無期限に消費するのを防ぐのに役立ちます。 操作が部分ロールバックをサポートしているか、補正トランザクションとして最適に扱われるかを検討します。

  • 最初の要求を送信するときに、クライアントに冪等性キー (たとえば、Idempotency-Key 要求ヘッダー内) の指定を求めることを検討してください。 バックエンドが重複するキーを受け取った場合は、2 番目の作業項目をエンキューするのではなく、既存の状態リソースを返す必要があります。 この方法は、サーバーが既に受け入れた POST をクライアントが再試行する原因となるネットワーク障害から保護します。 このパターンでは特に重要です。クライアントには、失われた応答と受信されなかった要求を区別する方法がないためです。

このパターンでは、クライアントが定期的に新しい要求を発行して状態を確認する HTTP ポーリングについて説明します。 ロングポーリングは、関連しているが、明確に異なる技法です。クライアントはリクエストを送り、新しいデータが利用可能になるか、タイムアウトが発生するまで、サーバーは接続を維持したままにします。 長いポーリングでは、定期的なポーリングと比較して応答の待機時間が短縮されますが、接続の管理とタイムアウトに関する複雑さが生じます。

このパターンを使用する場合

このパターンは次の状況で使用します。

  • ブラウザー アプリケーションなどのクライアント側コードを操作すると、これらの制約によってコールバック エンドポイントの提供が困難になる、または実行時間の長い接続が複雑になりすぎる。

  • HTTP プロトコルのみを使用するサービスを呼び出すと、クライアント側のファイアウォール制限のため、リターン サービスはコールバックを送信できません。

  • WebSocket や webhook などの最新のコールバック メカニズムをサポートしていないワークロードと統合します。

このパターンは、次の場合に適さない場合があります。

  • 代わりに、Azure Event Gridなどの非同期通知用に構築されたサービスを使用できます。

  • 応答は、クライアントにリアルタイムでストリーミングする必要があります。 Server-Sent イベント (SSE) を検討します。これは、クライアントがポーリングを必要とせずに、サーバーからクライアントへの軽量で HTTP ネイティブの一方向のプッシュ チャネルを提供します。

  • クライアントは多くの結果を収集する必要があり、それらの結果の待機時間が重要です。 代わりにメッセージ ブローカーを検討してください。

  • WebSocket や SignalR などのサーバー側の永続的なネットワーク接続を使用できます。 これらの接続を使用して、呼び出し元に結果を通知できます。

  • ネットワーク設計では、非同期コールバックまたは Webhook を受信するためのオープン ポートがサポートされています。

ワークロード設計

アーキテクトは、ワークロードの設計で非同期 Request-Reply パターンを使用して、Azure Well-Architected Framework の柱で説明されている目標と原則に対処する方法を評価する必要があります。

支柱 このパターンが柱の目標をサポートする方法
パフォーマンス効率 は、スケーリング、データ、およびコードの最適化を通じて、ワークロード の需要を効率的に満たすのに役立ちます。 即時応答を必要としないプロセスの要求フェーズと応答フェーズを切り離すことで、応答性とスケーラビリティを向上させます。 非同期アプローチを使用すると、コンカレンシーが向上し、容量が使用可能になったときにサーバーのスケジュールが機能します。

- PE:05 スケーリングとパーティショニング
- PE:07 コードとインフラストラクチャ

設計上の決定と同様に、このパターンが導入する可能性がある他の柱の目標に対するトレードオフを検討してください。

次のコードは、Azure Functionsを使用してこのパターンを実装するアプリケーションからの抜粋を示しています。 このソリューションには、次の 3 つの機能があります。

  • 非同期 API エンドポイント
  • 状態エンドポイント
  • キューに置かれた作業項目を受け取って実行するバックエンド関数

Functions の非同期要求応答パターンの構造の図。

GitHub logo. このサンプルは、GitHub で入手できます。

この実装では、マネージド ID を使用して Azure Service Bus と Azure Blob Storage で認証を行います。そのため、接続文字列やアカウント キーの格納が回避されます。 依存関係は、Program.csを使用してDefaultAzureCredentialに登録され、プライマリ コンストラクターを介して挿入されます。

AsyncProcessingWorkAcceptor 関数

AsyncProcessingWorkAcceptor関数は、クライアント アプリケーションからの作業を受け入れ、処理のためにエンキューするエンドポイントを実装します。

  • 関数は、要求 ID を生成し、それをメタデータとしてキュー メッセージに追加します。

  • HTTP 応答には、状態エンドポイントを指す Location ヘッダーと、ポーリング間隔を示す Retry-After ヘッダーが含まれます。 要求 ID が URL パスに表示されます。

public class AsyncProcessingWorkAcceptor(ServiceBusClient _serviceBusClient)
{
    [Function("AsyncProcessingWorkAcceptor")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,
        [FromBody] CustomerPOCO customer)
    {
        if (string.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
        {
            return new BadRequestResult();
        }

        string requestId = Guid.NewGuid().ToString();

        string statusUrl = $"https://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{requestId}";

        var messagePayload = JsonConvert.SerializeObject(customer);
        var message = new ServiceBusMessage(messagePayload);
        message.ApplicationProperties.Add("RequestGUID", requestId);
        message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.UtcNow);
        message.ApplicationProperties.Add("RequestStatusURL", statusUrl);
        var sender = _serviceBusClient.CreateSender("outqueue");

        await sender.SendMessageAsync(message);

        req.HttpContext.Response.Headers["Retry-After"] = "5";

        return new AcceptedResult(statusUrl, null);
    }
}

AsyncProcessingBackgroundWorker 関数

AsyncProcessingBackgroundWorker関数は、キューから操作を読み取り、メッセージ ペイロードに基づいて処理し、結果をストレージ アカウントに書き込みます。

public class AsyncProcessingBackgroundWorker(BlobContainerClient _blobContainerClient)
{
    [Function("AsyncProcessingBackgroundWorker")]
    public async Task Run(
        [ServiceBusTrigger("outqueue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
    {
        // Perform an actual action against the blob data source for the async readers to be able to check against.
        // This is where your actual service worker processing will be performed

        var requestGuid = message.ApplicationProperties["RequestGUID"].ToString();
        string blobName = $"{requestGuid}.blobdata";

        var blobClient = _blobContainerClient.GetBlobClient(blobName);
        using (MemoryStream memoryStream = new MemoryStream())
        using (StreamWriter writer = new StreamWriter(memoryStream))
        {
            writer.Write(message.Body.ToString());
            writer.Flush();
            memoryStream.Position = 0;

            await blobClient.UploadAsync(memoryStream, overwrite: true);
        }
    }
}

AsyncOperationStatusChecker 関数

AsyncOperationStatusChecker 関数は、状態エンドポイントを実装します。 この関数は、要求の状態を確認します。

  • 要求が完了すると、関数は HTTP 303 (その他を参照) を返し、結果の バレット キー URL にクライアントをリダイレクトします。

  • 要求が保留中の場合、関数は現在の状態を 含む HTTP 200 コードを返します。

public class AsyncOperationStatusChecker(ILogger<AsyncOperationStatusChecker> _logger)
{
    [Function("AsyncOperationStatusChecker")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{requestId}")] HttpRequest req,
        [BlobInput("data/{requestId}.blobdata", Connection = "DataStorage")] BlockBlobClient inputBlob, string requestId)
    {
        OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
        OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");

        _logger.LogInformation("Received status request for {RequestId} - OnComplete {OnComplete} - OnPending {OnPending}",
            requestId, OnComplete, OnPending);

        // Check whether the blob exists.
        if (await inputBlob.ExistsAsync())
        {
            // If the blob exists, the function uses the OnComplete parameter to determine the next action.
            return await OnCompleted(OnComplete, inputBlob, requestId, req);
        }
        else
        {
            // If the blob doesn't exist, the function uses the OnPending parameter to determine the next action.
            switch (OnPending)
            {
                case OnPendingEnum.OK:
                    {
                        // Return an HTTP 200 status code.
                        return new OkObjectResult(new { status = "In progress", Location = rqs });
                    }

                case OnPendingEnum.Synchronous:
                    {
                        // Long polling example: hold the connection open and check for completion
                        // using exponential backoff. Time out after approximately one minute.
                        int backoff = 250;

                        while (!await inputBlob.ExistsAsync() && backoff < 64000)
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - retrying in {Backoff} ms", requestId, backoff);
                            backoff = backoff * 2;
                            await Task.Delay(backoff);
                        }

                        if (await inputBlob.ExistsAsync())
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - completed after {Backoff} ms", requestId, backoff);
                            return await OnCompleted(OnComplete, inputBlob, requestId, req);
                        }
                        else
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - NOT FOUND after timeout {Backoff} ms", requestId, backoff);
                            return new NotFoundResult();
                        }
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnPending}");
                    }
            }
        }
    }

    private async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string requestId, HttpRequest req)
    {
        switch (OnComplete)
        {
            case OnCompleteEnum.Redirect:
                {
                    // Generate a user delegation SAS URI using managed identity credentials.
                    BlobServiceClient blobServiceClient = inputBlob.GetParentBlobContainerClient().GetParentBlobServiceClient();
                    var userDelegationKey = await blobServiceClient.GetUserDelegationKeyAsync(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddDays(7));

                    // Return 303 See Other to redirect the client to the result resource.
                    // GenerateUserDelegationSasUri is a custom helper; see the full implementation on GitHub.
                    req.HttpContext.Response.Headers.Location = GenerateUserDelegationSasUri(inputBlob, userDelegationKey);;
                    return new StatusCodeResult(StatusCodes.Status303SeeOther);
                }

            case OnCompleteEnum.Stream:
                {
                    // Download the file and return it directly to the caller.
                    // For larger files, use a stream to minimize RAM usage.
                    return new OkObjectResult(await inputBlob.DownloadContentAsync());
                }

            default:
                {
                    throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                }
        }
    }
}

public enum OnCompleteEnum
{
    Redirect,
    Stream
}

public enum OnPendingEnum
{
    OK,
    Synchronous
}

次のステップ