Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página cobre ajustes computacionais, técnicas para reduzir a latência de ponta a ponta e abordagens para medir o desempenho das consultas em modo em tempo real.
Ajuste computacional
Ao configurar o seu cálculo, considere o seguinte:
- Ao contrário do modo de microlote, as tarefas em tempo real podem ficar ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
- Procure atingir um nível alvo de utilização do cluster, como 50%, ajustando:
-
maxPartitions(para Kafka) -
spark.sql.shuffle.partitions(para fases aleatórias)
-
- O Databricks recomenda definir
maxPartitionspara que cada tarefa consiga gerir múltiplas partições Kafka para reduzir a sobrecarga. - Ajuste os slots de tarefas por trabalhador para corresponder à carga de trabalho para trabalhos simples de um estágio.
- Para trabalhos com muita reorganização, experimente encontrar o número mínimo de partições de reorganização que evitem acumulações e ajuste a partir daí. O sistema de computação não vai agendar a tarefa se não tiver espaços suficientes.
Observação
A partir das versões Databricks Runtime 16.4 LTS e superiores, todos os pipelines em tempo real utilizam o checkpoint v2 para permitir transições suaves entre os modos em tempo real e micro-batch.
Otimização de latência
O modo de Streaming Estruturado em tempo real tem técnicas opcionais para reduzir a latência de ponta a ponta. Nenhum deles está ativado por defeito. Tem de os ativar separadamente.
- Acompanhamento de progresso assíncrono: Transfere as gravações para os logs de offset e de commit para um thread assíncrono, reduzindo o tempo entre processos de lote para consultas sem estado.
- Checkpoint de estado assíncrono: Começa a processar o próximo micro-lote assim que o cálculo termina, sem esperar pelo checkpoint de estado, reduzindo a latência para consultas com estado.
Monitorização e observabilidade
No modo em tempo real, as métricas tradicionais de duração em lote não refletem a latência real de ponta a ponta. Use as abordagens abaixo para medir a latência com precisão e identificar gargalos nas suas consultas.
A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo temporal da fonte for emitido em Kafka, pode calcular a latência como a diferença entre o carimbo temporal de saída do Kafka e o da fonte.
Métricas incorporadas com StreamingQueryProgress
O evento StreamingQueryProgress é automaticamente registado nos registos do controlador e é acessível através da função de callback do StreamingQueryListeneronQueryProgress(). Isto permite-lhe reagir programáticamente a eventos de progresso, por exemplo, se quiser publicar métricas num sistema de monitorização externo.
QueryProgressEvent.json() ou toString() incluem métricas do modo em tempo real:
-
Latência de processamento (
processingLatencyMs). O tempo decorrido entre o momento em que a consulta em modo em tempo real lê um registo e o momento em que a consulta o escreve para a etapa seguinte ou a jusante. Para consultas de estágio único, isto mede a mesma duração que a latência de ponta a ponta. O sistema reporta esta métrica por tarefa. -
Latência da fila de origem (
sourceQueuingLatencyMs). O intervalo de tempo decorrido entre o momento em que o sistema escreve um registo num barramento de mensagens — por exemplo, o tempo de anexação do registo em Kafka — e o momento em que a consulta em modo em tempo real lê o registo pela primeira vez. O sistema reporta esta métrica por tarefa. -
Latência de ponta a ponta (
e2eLatencyMs). O tempo entre o momento em que o sistema escreve o registo num barramento de mensagens e o momento em que a consulta em modo de tempo real escreve o registo a jusante. O sistema agrega esta métrica por lote em todos os registos processados por todas as tarefas.
Por exemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Medição personalizada de latência com a API Observe
A API Observe permite-lhe medir a latência inline sem iniciar um trabalho separado. Se tiver um carimbo temporal de origem que aproxima o tempo de chegada dos dados de origem, pode estimar a latência por lote registando um carimbo de tempo antes do sumidouro e calculando a diferença. Os resultados aparecem nos relatórios de progresso e estão disponíveis para os ouvintes.
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Saída de exemplo:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}