次の方法で共有


構造化ストリーミングのリアルタイム モード

このページでは、構造化ストリーミングでリアルタイム モードを使用する方法について説明します。これには、それが何であるか、そのしくみも含まれます。

詳細なセットアップ手順については、「 リアルタイム モードの使用を開始する」を参照してください。 コード例については、 リアルタイム モードの例を参照してください。 サポートされているソース、シンク、演算子、および制限については、 リアルタイム モードのリファレンスを参照してください

リアルタイム モードとは

リアルタイム モードは、構造化ストリーミングのトリガーの種類であり、エンドツーエンドの待機時間が 5 ミリ秒という非常に短い待機時間のデータ処理を可能にします。 不正行為の検出、リアルタイムのパーソナル化、インスタントな意思決定システムなど、ストリーミング データへの即時対応を必要とする運用ワークロードには、リアルタイム モードを使用します。

運用ワークロードと分析ワークロード

ストリーミング ワークロードは、運用ワークロードと分析ワークロードに大きく分けることができます。

  • 運用ワークロードは、リアルタイム データを使用し、ビジネス ロジックを適用し、ダウンストリームのアクションまたは決定をトリガーします。
  • 分析ワークロードでは、データインジェストと変換が使用されます。通常は medallion アーキテクチャに従います (たとえば、ブロンズ、シルバー、ゴールドの各テーブルにデータを取り込みます)。

運用ワークロードの例を次に示します。

  • 異常な場所、大きなトランザクション サイズ、迅速な支出パターンなどの要因に基づいて、不正行為スコアがしきい値を超えた場合に、クレジット カード トランザクションをリアルタイムでブロックまたはフラグ設定します。
  • クリックストリーム データで、ユーザーがジーンズを 5 分間閲覧していることを示すプロモーション メッセージを配信し、今後 15 分間に購入した場合は 25% 割引を提供します。

一般に、運用ワークロードは、2 秒未満のエンド ツー エンド待機時間の必要性によって特徴付けられます。 これは、Apache Spark 構造化ストリーミングのリアルタイム モードで実現できます。

リアルタイム モードで低待機時間を実現する方法

リアルタイム モードでは、次の方法で実行アーキテクチャが向上します。

  • 長時間バッチを実行する (既定値は 5 分) では、システムはソースでデータが利用可能になり次第、そのデータを処理します。
  • クエリのすべてのステージを同時にスケジュールする。 これには、使用可能なタスク スロットの数が、バッチ内のすべてのステージのタスクの数以上である必要があります。
  • データが生成されるとすぐに、ストリーミングシャッフルを使用してステージ間で渡します。

バッチの処理が終了し、次のバッチが開始される前に、Structured Streaming チェックポイントが進行し、メトリックが発行されます。 バッチ期間は、チェックポイント処理の頻度に影響します。

  • 長いバッチ: チェックポイント処理の頻度が低くなります。つまり、障害発生時の再生時間が長くなり、メトリックの可用性が遅れることを意味します。
  • 短いバッチ: チェックポイント処理の頻度が高くなり、待機時間に影響する可能性があります。

Databricks では、ターゲット ワークロードに対してリアルタイム モードをベンチマークして、適切なトリガー間隔を見つけることをお勧めします。

リアルタイム モードを使用する場合

ユース ケースで必要な場合は、リアルタイム モードを選択します。

  • 秒未満の待機時間: リアルタイムでトランザクションをブロックする必要がある不正検出システムなど、ミリ秒以内にデータに応答する必要があるアプリケーション。
  • 運用上の意思決定: リアルタイムのオファー、アラート、通知など、受信データに基づいて即時アクションをトリガーするシステム。
  • 継続的処理: 定期的なバッチではなく、到着するとすぐにデータを処理する必要があるワークロード。

次の場合は、マイクロバッチ モード (既定の構造化ストリーミング トリガー) を使用します。

  • 分析処理: ETL パイプライン、データ変換、およびメダリオン アーキテクチャの実装。待機時間の要件は秒単位または分単位で測定されます。
  • コストの最適化: リアルタイム モードでは専用のコンピューティング リソースが必要であるため、サブ秒の待機時間が不要なワークロード。
  • チェックポイントの頻度は重要です。より高速な回復のために、より頻繁なチェックポイント処理の恩恵を受けるアプリケーション。

要件と構成

リアルタイム モードには、コンピューティングのセットアップとクエリの構成に固有の要件があります。 このセクションでは、リアルタイム モードを使用するために必要な前提条件と構成手順について説明します。

前提 条件

リアルタイム モードを使用するには、次の要件を満たすようにコンピューティングを構成する必要があります。

  • Databricks Runtime 16.4 LTS 以降: リアルタイム モードは、DBR 16.4 LTS 以降のバージョンでのみ使用できます。
  • 専用コンピューティング: 専用 (以前のシングル ユーザー) コンピューティングを使用する必要があります。 Standard (以前の共有)、Lakeflow Spark 宣言パイプライン、およびサーバーレス クラスターはサポートされていません。
  • 自動スケールをオフにする: 自動スケールはサポートされていません。
  • Photon をオフにする: Photon アクセラレーションはサポートされていません。
  • Spark の構成: spark.databricks.streaming.realTimeMode.enabledtrue に設定する必要があります。

リアルタイム モード用のコンピューティングの作成と構成の詳細な手順については、「リアルタイム モード の使用を開始する」を参照してください。

クエリの構成

リアルタイム モードでクエリを実行するには、リアルタイム トリガーを有効にする必要があります。 リアルタイム トリガーは、更新モードでのみサポートされます。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

コンピューティングのサイズ設定

コンピューティングに十分なタスク スロットがある場合は、コンピューティング リソースごとに 1 つのリアルタイム ジョブを実行できます。

低待機時間モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。

スロット計算の例

パイプラインの種類 コンフィギュレーション 必要なスロット
単一ステージステートレス (Kafka ソース + シンク) maxPartitions = 8 8 スロット
2段階のステートフル (Kafkaソース + シャッフル) maxPartitions = 8、シャッフル パーティション = 20 28 スロット (8 + 20)
3段階 (Kafka ソース + シャッフル + リパーティション) maxPartitions = 8、各 20 の 2 つのシャッフル ステージ 48 スロット (8 + 20 + 20)

maxPartitions設定しない場合は、Kafka トピックのパーティションの数を使用します。

パフォーマンス

コンピューティング チューニングのガイダンス、待機時間の最適化手法、クエリの監視については、 リアルタイム モードのクエリ パフォーマンスの最適化と監視に関するトピックを参照してください。

機能のサポートと制限事項

サポートされている環境、言語、コンピューティングの種類、ソース、シンク、演算子、既知の制限事項の完全な一覧については、 リアルタイム モードリファレンスを参照してください

次のステップ

リアルタイム モードの内容と構成方法を理解したら、次のリソースを調べて、リアルタイム ストリーミング アプリケーションの実装を開始します。