次の方法で共有


リアルタイム モードのクエリ パフォーマンスを最適化および監視する

このページでは、コンピューティングのチューニング、エンドツーエンドの待機時間を短縮する手法、およびリアルタイム モードでクエリのパフォーマンスを測定する方法について説明します。

計算の最適化

コンピューティングを構成するときは、次の点を考慮してください。

  • マイクロバッチ モードとは異なり、リアルタイム タスクはデータの待機中にアイドル状態を維持できるため、リソースの無駄を避けるためには適切なサイズ設定が不可欠です。
  • 次の調整により、ターゲット クラスターの使用率レベル (50%など) を目指します。
    • maxPartitions (Kafka の場合)
    • spark.sql.shuffle.partitions (シャッフル ステージの場合)
  • Databricks では、オーバーヘッドを削減するために、各タスクが複数の Kafka パーティションを処理するように、 maxPartitions を設定することをお勧めします。
  • 単純な 1 段階のジョブのワークロードに合わせて、ワーカーごとのタスク スロットを調整します。
  • シャッフルが多いジョブの場合、バックログを回避するためのシャッフルパーティションの最小数を実験で見つけ、その結果に基づいて調整してください。 十分なスロットがない場合、コンピューターはジョブをスケジュールしません。

Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 を使用して、リアルタイムモードとマイクロバッチ モードをシームレスに切り替えることができます。

待機時間の最適化

構造化ストリーミング リアルタイム モードには、エンドツーエンドの待機時間を短縮するための省略可能な手法があります。 どちらも既定では有効になっていません。 個別に有効にする必要があります。

  • 非同期進行状況の追跡: オフセットへの書き込みとコミット ログを非同期スレッドに移動し、ステートレス クエリのバッチ間時間を短縮します。
  • 非同期状態チェックポイント処理: 状態のチェックポイント処理を待たずに、計算が完了するとすぐに次のマイクロバッチの処理を開始し、ステートフル クエリの待機時間を短縮します。

監視と可観測性

リアルタイム モードでは、従来のバッチ期間メトリックには、実際のエンドツーエンドの待機時間は反映されません。 待機時間を正確に測定し、クエリのボトルネックを特定するには、次の方法を使用します。

エンドツーエンドの待機時間はワークロード固有であり、ビジネス ロジックでのみ正確に測定できる場合があります。 たとえば、ソース タイムスタンプが Kafka で出力される場合、Kafka の出力タイムスタンプとソース タイムスタンプの差として待機時間を計算できます。

組み込みのメトリック StreamingQueryProgress

StreamingQueryProgress イベントは、ドライバー ログに自動的に記録され、StreamingQueryListeneronQueryProgress()コールバック関数を介してアクセスできます。 これにより、たとえば外部監視システムにメトリックを発行する場合など、進行状況イベントにプログラムで対応できます。 これらのリアルタイムモードメトリックをQueryProgressEvent.json()またはtoString()に含めます。

  1. 待機時間の処理 (processingLatencyMs)。 リアルタイム モードのクエリがレコードを読み取ったときと、クエリが次のステージまたはダウンストリームにレコードを書き込むまでの経過時間。 シングルステージ クエリの場合、これはエンドツーエンドの待機時間と同じ期間を測定します。 システムは、タスクごとにこのメトリックを報告します。
  2. ソース キューの待機時間 (sourceQueuingLatencyMs)。 システムがメッセージ バスにレコードを書き込むまでの経過時間 (Kafka のログの追加時間など) と、リアルタイム モードクエリが最初にレコードを読み取るまでの時間。 システムは、タスクごとにこのメトリックを報告します。
  3. エンドツーエンドの待機時間 (e2eLatencyMs)。 システムがメッセージ バスにレコードを書き込み、リアルタイム モードクエリがレコードをダウンストリームに書き込むまでの時間。 システムは、すべてのタスクによって処理されたすべてのレコードにわたって、バッチごとにこのメトリックを集計します。

例えば次が挙げられます。

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Observe API を使用したカスタム待機時間の測定

Observe API を使用すると、別のジョブを起動せずに、インラインで待機時間を測定できます。 ソース データの到着時間に近いソース タイムスタンプがある場合は、シンクの前にタイムスタンプを記録し、その差を計算することで、バッチごとの待機時間を見積もることができます。 結果は進行状況レポートに表示され、リスナーが利用できるようになっています。

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

サンプル出力:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}