このページでは、コンピューティングのチューニング、エンドツーエンドの待機時間を短縮する手法、およびリアルタイム モードでクエリのパフォーマンスを測定する方法について説明します。
計算の最適化
コンピューティングを構成するときは、次の点を考慮してください。
- マイクロバッチ モードとは異なり、リアルタイム タスクはデータの待機中にアイドル状態を維持できるため、リソースの無駄を避けるためには適切なサイズ設定が不可欠です。
- 次の調整により、ターゲット クラスターの使用率レベル (50%など) を目指します。
-
maxPartitions(Kafka の場合) -
spark.sql.shuffle.partitions(シャッフル ステージの場合)
-
- Databricks では、オーバーヘッドを削減するために、各タスクが複数の Kafka パーティションを処理するように、
maxPartitionsを設定することをお勧めします。 - 単純な 1 段階のジョブのワークロードに合わせて、ワーカーごとのタスク スロットを調整します。
- シャッフルが多いジョブの場合、バックログを回避するためのシャッフルパーティションの最小数を実験で見つけ、その結果に基づいて調整してください。 十分なスロットがない場合、コンピューターはジョブをスケジュールしません。
注
Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 を使用して、リアルタイムモードとマイクロバッチ モードをシームレスに切り替えることができます。
待機時間の最適化
構造化ストリーミング リアルタイム モードには、エンドツーエンドの待機時間を短縮するための省略可能な手法があります。 どちらも既定では有効になっていません。 個別に有効にする必要があります。
- 非同期進行状況の追跡: オフセットへの書き込みとコミット ログを非同期スレッドに移動し、ステートレス クエリのバッチ間時間を短縮します。
- 非同期状態チェックポイント処理: 状態のチェックポイント処理を待たずに、計算が完了するとすぐに次のマイクロバッチの処理を開始し、ステートフル クエリの待機時間を短縮します。
監視と可観測性
リアルタイム モードでは、従来のバッチ期間メトリックには、実際のエンドツーエンドの待機時間は反映されません。 待機時間を正確に測定し、クエリのボトルネックを特定するには、次の方法を使用します。
エンドツーエンドの待機時間はワークロード固有であり、ビジネス ロジックでのみ正確に測定できる場合があります。 たとえば、ソース タイムスタンプが Kafka で出力される場合、Kafka の出力タイムスタンプとソース タイムスタンプの差として待機時間を計算できます。
組み込みのメトリック StreamingQueryProgress
StreamingQueryProgress イベントは、ドライバー ログに自動的に記録され、StreamingQueryListenerのonQueryProgress()コールバック関数を介してアクセスできます。 これにより、たとえば外部監視システムにメトリックを発行する場合など、進行状況イベントにプログラムで対応できます。 これらのリアルタイムモードメトリックをQueryProgressEvent.json()またはtoString()に含めます。
-
待機時間の処理 (
processingLatencyMs)。 リアルタイム モードのクエリがレコードを読み取ったときと、クエリが次のステージまたはダウンストリームにレコードを書き込むまでの経過時間。 シングルステージ クエリの場合、これはエンドツーエンドの待機時間と同じ期間を測定します。 システムは、タスクごとにこのメトリックを報告します。 -
ソース キューの待機時間 (
sourceQueuingLatencyMs)。 システムがメッセージ バスにレコードを書き込むまでの経過時間 (Kafka のログの追加時間など) と、リアルタイム モードクエリが最初にレコードを読み取るまでの時間。 システムは、タスクごとにこのメトリックを報告します。 -
エンドツーエンドの待機時間 (
e2eLatencyMs)。 システムがメッセージ バスにレコードを書き込み、リアルタイム モードクエリがレコードをダウンストリームに書き込むまでの時間。 システムは、すべてのタスクによって処理されたすべてのレコードにわたって、バッチごとにこのメトリックを集計します。
例えば次が挙げられます。
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Observe API を使用したカスタム待機時間の測定
Observe API を使用すると、別のジョブを起動せずに、インラインで待機時間を測定できます。 ソース データの到着時間に近いソース タイムスタンプがある場合は、シンクの前にタイムスタンプを記録し、その差を計算することで、バッチごとの待機時間を見積もることができます。 結果は進行状況レポートに表示され、リスナーが利用できるようになっています。
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
サンプル出力:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}