データ フローのデータ バッファリングとディスク永続化を構成する

データ フローが、Azure Event Hubs、Microsoft Fabric Real-Time Intelligence、Kafka、または別のクラウド サービスなどの送信先エンドポイントにメッセージを送信すると、送信先またはネットワークが使用できなくなる可能性があります。 Azure IoT Operationsは、ローカル MQTT ブローカー サブスクリプション モデルを使用してメッセージをバッファーし、配信を再試行します。

この記事では、データ フローが宛先の停止中にメッセージを保護する方法と、より強力な保護のためにブローカーバッファリングとディスク永続化を構成する方法について説明します。

宛先の停止中にデータ フローでメッセージをバッファーする方法

ローカル MQTT ブローカーをデータ フローのソース エンドポイントとして使用する場合、データ フローに発行する資産を直接または間接的に使用すると、データ フローは MQTT ブローカーからサブスクライバーとしてメッセージを受信します。 データ フローは、メッセージが正常に処理されて宛先に配信された後、またはフィルター処理、スキーマ検証、またはメッセージの有効期限が切れたためにメッセージが意図的に削除された後に、各ソース メッセージを確認します。

宛先エンドポイントが使用できない場合、配信を完了できません。 この場合、データ フローはソース メッセージを確認しません。 MQTT ブローカーはメッセージをサブスクライバー キューに保持し、データ フローは配信を再試行します。 接続が復元されると、データ フローはキューに登録されたメッセージを送信先に送信し、正常に配信された後に確認します。

メッセージ パスは、パブリッシャーまたは資産から MQTT ブローカー、MQTT ブローカーからデータ フロー、および宛先エンドポイントへのデータ フローです。

  1. MQTT ブローカーは、データ フローにメッセージを配信します。
  2. データ フローは、メッセージを宛先エンドポイントに送信します。
  3. 送信が成功すると、データ フローによってソース メッセージが確認され、ブローカーによってサブスクライバー キューから削除されます。
  4. 送信が失敗した場合、データ フローはソース メッセージを確認しません。 ブローカーはキューに入った状態を維持します。
  5. データ フローは、成功するか、メッセージの有効期限が切れるか、構成された制限が適用されるまで配信を再試行します。

Important

データ フロー バッファリングは境界付けされます。 キューに登録されたメッセージは、MQTT ブローカーのメモリ プロファイル、サブスクライバー キューの制限、ディスクに基づくメッセージ バッファー サイズ、永続化の構成、メッセージまたはセッションの有効期限の影響を受けます。 これらの設定は、許容する必要がある最大停止時間とスループットに対して構成します。

データ保護構成レイヤー

次の構成レイヤーを使用して、宛先の停止時にメッセージをバッファーおよび保護する方法を制御します。

レイヤー 対抗するものに対する保護 既定の動作 顧客のアクション
データ フローの再試行と確認応答の保留 一時的な宛先またはネットワークの停止 ローカル MQTT ブローカーと資産に基づくソース用に組み込まれています 構成は不要
MQTT ブローカー サブスクライバー キュー データ フロー サブスクリプションによって受信されたが、まだ確認されていないメッセージ メモリに格納されている メモリ プロファイルとサブスクライバー キューの制限を構成する
ディスクベース メッセージ バッファー 使用可能なメモリを超える大規模な一時バックログ Disabled ディスクに基づくメッセージ バッファーを使用してデプロイ時にブローカーを構成する
MQTT ブローカーの永続化 メッセージがキューに入っている間にブローカーまたはポッドを再起動する 既定では無効になっています ブローカーの永続化とサブスクライバー キューの永続化を有効にする
データ フロー requestDiskPersistence データフローごとの永続的なサブスクライバー キュー ストレージの要求 Disabled データ フローまたはデータ フロー グラフで requestDiskPersistence を有効にし、ブローカーで動的サブスクライバー キューの永続化を有効にする
メッセージとセッションの有効期限 境界付きストレージと再生の動作 カスタマイズ可能 損失の許容範囲と停止期間に基づいて有効期限と制限を設定する

ローカル MQTT ブローカー サブスクライバー キューは、既定でメモリに格納されます。 MQTT ブローカーは、次の 2 つの異なる方法でディスクを使用するように構成できます。

  • ディスクに基づくメッセージ バッファー: キューが使用可能なメモリを超えて拡張されるときに、ディスクをスピルオーバー バッファーとして使用します。 この設定は、より大きな一時的なバックログに役立ちますが、ブローカーの再起動間の永続性と同じではありません。
  • MQTT ブローカーの永続性: 選択したブローカー データ (サブスクライバー キューを含む) をディスクに永続化し、キューに入ったメッセージが再起動または電力損失に耐えられるようにします。

ブローカー構成の詳細については、以下を参照してください。

バッファリング構成を選択する

ワークロードの停止期間と持続性の要件に基づいて構成を選択します。

  • 一時的なクラウドまたはネットワークの停止が短い場合は、既定のメモリ内サブスクライバー キューで十分な場合があります。
  • スループットを向上させたり、一時的な停止を長くしたりするには、ディスクでバックアップされたメッセージ バッファーを構成します。
  • 再起動または電源損失保護の場合は、MQTT ブローカーの永続化とサブスクライバー キューの永続化を有効にしてから、データ フローまたはデータ フロー グラフで requestDiskPersistence を有効にします。
  • 境界付きストレージ環境の場合は、サブスクライバー キューの制限、メッセージの有効期限、監視を構成して、ブローカーがキューの制限を適用し、ポリシーに従ってメッセージを削除または拒否します。

例: デスティネーションの障害

ソース エンドポイントとして既定のローカル MQTT ブローカーを使用し、宛先エンドポイントとしてAzure Event Hubsして、データ フローを作成するとします。 データ フローとAzure Event Hubsの間の接続が失われた場合、データ フローは送信を再試行し、ソース メッセージを確認しません。 MQTT ブローカーは、未確認のメッセージをキューに入れます。 既定の設定では、キューはメモリに格納されます。 ディスクに基づくメッセージ バッファーを使用すると、キューがディスクにスピルする可能性があります。 ブローカーの永続化とデータ フローの requestDiskPersistenceでは、キューに置かれたメッセージは、構成された永続化、有効期限、およびストレージの制限に従って、ブローカーの再起動後も存続できます。

データ フローのディスク永続化を有効にする

ディスクの永続化により、データ フローとデータ フロー グラフは再起動後も処理状態を維持できます。 この機能を有効にすると、MQTT ブローカーは、サブスクライバー キュー内のメッセージなどのデータをディスクに保持します。 この方法は、停電やブローカーの再起動中に、データ フローのデータ ソースがキューに格納されたデータを失わないよう支援します。 永続化はデータ フローごとに構成されるため、ブローカーは最適なパフォーマンスを維持するため、この機能を使用する必要があるデータ フローのみが使用されます。

データ フローは、MQTTv5 ユーザー プロパティを使用して、サブスクリプション中に永続化を要求します。 この機能は、次の場合にのみ機能します。

  • データ フローでは、MQTT ブローカーまたは資産がソースとして使用されます。
  • MQTT ブローカーでは、サブスクライバー キューなどのデータ型に対して動的永続化モードが Enabled に設定された永続化が有効になっています。

MQTT ブローカーの永続化の構成の詳細については、「 MQTT ブローカーの永続化の構成」を参照してください。

この設定は、 Enabled または Disabledを受け入れます。 Disabled はデフォルト値です。

データ フローの構成

データ フローを作成または編集するときに、[編集] を選択し、[データの永続化を要求する] の横にある [はい] を選択します。

データ フロー グラフ用に構成する

データ フロー グラフを作成または編集するときに、[編集] を選択し、[データ永続化の要求] の横にある [はい] を選択します。