Partilhar via


Otimizar e monitorizar o desempenho das consultas em modo em tempo real

Esta página cobre ajustes computacionais, técnicas para reduzir a latência de ponta a ponta e abordagens para medir o desempenho das consultas em modo em tempo real.

Ajuste computacional

Ao configurar o seu cálculo, considere o seguinte:

  • Ao contrário do modo de microlote, as tarefas em tempo real podem ficar ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
  • Procure atingir um nível alvo de utilização do cluster, como 50%, ajustando:
    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para fases aleatórias)
  • O Databricks recomenda definir maxPartitions para que cada tarefa consiga gerir múltiplas partições Kafka para reduzir a sobrecarga.
  • Ajuste os slots de tarefas por trabalhador para corresponder à carga de trabalho para trabalhos simples de um estágio.
  • Para trabalhos com muita reorganização, experimente encontrar o número mínimo de partições de reorganização que evitem acumulações e ajuste a partir daí. O sistema de computação não vai agendar a tarefa se não tiver espaços suficientes.

Observação

A partir das versões Databricks Runtime 16.4 LTS e superiores, todos os pipelines em tempo real utilizam o checkpoint v2 para permitir transições suaves entre os modos em tempo real e micro-batch.

Otimização de latência

O modo de Streaming Estruturado em tempo real tem técnicas opcionais para reduzir a latência de ponta a ponta. Nenhum deles está ativado por defeito. Tem de os ativar separadamente.

  • Acompanhamento de progresso assíncrono: Transfere as gravações para os logs de offset e de commit para um thread assíncrono, reduzindo o tempo entre processos de lote para consultas sem estado.
  • Checkpoint de estado assíncrono: Começa a processar o próximo micro-lote assim que o cálculo termina, sem esperar pelo checkpoint de estado, reduzindo a latência para consultas com estado.

Monitorização e observabilidade

No modo em tempo real, as métricas tradicionais de duração em lote não refletem a latência real de ponta a ponta. Use as abordagens abaixo para medir a latência com precisão e identificar gargalos nas suas consultas.

A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo temporal da fonte for emitido em Kafka, pode calcular a latência como a diferença entre o carimbo temporal de saída do Kafka e o da fonte.

Métricas incorporadas com StreamingQueryProgress

O evento StreamingQueryProgress é automaticamente registado nos registos do controlador e é acessível através da função de callback do StreamingQueryListeneronQueryProgress(). Isto permite-lhe reagir programáticamente a eventos de progresso, por exemplo, se quiser publicar métricas num sistema de monitorização externo. QueryProgressEvent.json() ou toString() incluem métricas do modo em tempo real:

  1. Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta em modo em tempo real lê um registo e o momento em que a consulta o escreve para a etapa seguinte ou a jusante. Para consultas de estágio único, isto mede a mesma duração que a latência de ponta a ponta. O sistema reporta esta métrica por tarefa.
  2. Latência da fila de origem (sourceQueuingLatencyMs). O intervalo de tempo decorrido entre o momento em que o sistema escreve um registo num barramento de mensagens — por exemplo, o tempo de anexação do registo em Kafka — e o momento em que a consulta em modo em tempo real lê o registo pela primeira vez. O sistema reporta esta métrica por tarefa.
  3. Latência de ponta a ponta (e2eLatencyMs). O tempo entre o momento em que o sistema escreve o registo num barramento de mensagens e o momento em que a consulta em modo de tempo real escreve o registo a jusante. O sistema agrega esta métrica por lote em todos os registos processados por todas as tarefas.

Por exemplo:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Medição personalizada de latência com a API Observe

A API Observe permite-lhe medir a latência inline sem iniciar um trabalho separado. Se tiver um carimbo temporal de origem que aproxima o tempo de chegada dos dados de origem, pode estimar a latência por lote registando um carimbo de tempo antes do sumidouro e calculando a diferença. Os resultados aparecem nos relatórios de progresso e estão disponíveis para os ouvintes.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Saída de exemplo:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}