Condividi tramite


Modalità in tempo reale in Structured Streaming

Important

Questa funzionalità è in Anteprima Pubblica.

La modalità in tempo reale è un tipo di trigger per Structured Streaming che consente l'elaborazione dei dati a bassa latenza con latenza end-to-end inferiore a cinque millisecondi. Usare la modalità in tempo reale per i carichi di lavoro operativi che richiedono una risposta immediata ai dati di streaming, ad esempio il rilevamento delle frodi, la personalizzazione in tempo reale e i sistemi decisionali istantanei.

La modalità in tempo reale è disponibile in Databricks Runtime 16.4 LTS e versioni successive. Per istruzioni dettagliate sulla configurazione, vedere Introduzione alla modalità in tempo reale. Per esempi di codice, vedere Esempi di modalità in tempo reale.

Che cos'è la modalità in tempo reale?

Carichi di lavoro operativi e analitici

I carichi di lavoro di streaming possono essere suddivisi in generale in carichi di lavoro analitici e carichi di lavoro operativi:

  • I carichi di lavoro analitici usano l'inserimento e la trasformazione dei dati, in genere seguendo l'architettura medallion (ad esempio, l'inserimento di dati nelle tabelle bronze, silver e gold).
  • I carichi di lavoro operativi usano dati in tempo reale, applicano la logica di business e attivano azioni o decisioni downstream.

Ecco alcuni esempi di carichi di lavoro operativi:

  • Blocco o contrassegno di una transazione con carta di credito in tempo reale se un punteggio di frode supera una soglia, in base a fattori come posizione insolita, dimensioni elevate delle transazioni o modelli di spesa rapidi.
  • Il recapito di un messaggio promozionale quando i dati clickstream mostrano che un utente ha esplorato i jeans per cinque minuti, offrendo uno sconto di 25% se acquista nei prossimi 15 minuti.

In generale, i carichi di lavoro operativi sono caratterizzati dalla necessità di una latenza end-to-end sotto il secondo. Questa operazione può essere ottenuta con la modalità in tempo reale in Apache Spark Structured Streaming.

Come la modalità in tempo reale raggiunge una bassa latenza

La modalità in tempo reale migliora l'architettura di esecuzione tramite:

  • Esecuzione di batch a esecuzione prolungata (il valore predefinito è cinque minuti), in cui il sistema elabora i dati man mano che diventa disponibile nell'origine.
  • Pianificazione simultanea di tutte le fasi della query. Ciò richiede che il numero di slot di attività disponibili sia uguale o maggiore del numero di attività di tutte le fasi di un batch.
  • Trasferimento di dati tra le fasi non appena vengono prodotti, mediante uno shuffle in streaming.

Alla fine dell'elaborazione di un batch e prima dell'avvio del successivo, Structured Streaming effettua checkpoint del progresso e pubblica le metriche. La durata del batch influisce sulla frequenza di checkpoint:

  • Batch più lunghi: controlli meno frequenti, ovvero riproduzioni più lunghe in caso di guasto e disponibilità ritardata delle metriche.
  • Batch più brevi: checkpoint più frequenti, che possono influire sulla latenza.

Databricks consiglia di eseguire il benchmarking della modalità in tempo reale rispetto al carico di lavoro di destinazione per trovare l'intervallo di trigger appropriato.

Quando usare la modalità in tempo reale

Scegliere la modalità in tempo reale quando il caso d'uso richiede:

  • Latenza secondaria: applicazioni che devono rispondere ai dati entro millisecondi, ad esempio sistemi di rilevamento delle frodi che devono bloccare le transazioni in tempo reale.
  • Processo decisionale operativo: sistemi che attivano azioni immediate in base ai dati in ingresso, ad esempio offerte in tempo reale, avvisi o notifiche.
  • Elaborazione continua: carichi di lavoro in cui i dati devono essere elaborati non appena arrivano, anziché in batch periodici.

Usare la modalità micro-batch (il trigger predefinito di Streaming Strutturato) quando:

  • Elaborazione analitica: pipeline ETL, trasformazioni dei dati e implementazioni dell'architettura medallion in cui i requisiti di latenza vengono misurati in secondi o minuti.
  • Ottimizzazione dei costi: i carichi di lavoro in cui non è necessaria la latenza al secondo secondario, perché la modalità in tempo reale richiede risorse di calcolo dedicate.
  • La frequenza del checkpoint è importante: applicazioni che traggono vantaggio da un checkpointing più frequente per un recupero più rapido.

Requisiti e configurazione

La modalità in tempo reale prevede requisiti specifici per la configurazione di calcolo e la configurazione delle query. Questa sezione descrive i prerequisiti e i passaggi di configurazione necessari per usare la modalità in tempo reale.

Prerequisiti

Per usare la modalità in tempo reale, è necessario soddisfare i requisiti seguenti:

  • Databricks Runtime 16.4 LTS o versione successiva: la modalità in tempo reale è disponibile solo in DBR 16.4 LTS e versioni successive.
  • Calcolo dedicato: è necessario usare un calcolo dedicato (in precedenza utente singolo). Non sono supportati i cluster standard (in precedenza condivisi), le pipeline dichiarative di Lakeflow Spark e i cluster serverless.
  • Nessuna scalabilità automatica: la scalabilità automatica deve essere disabilitata.
  • Nessun fotono: l'accelerazione photon non è supportata con la modalità in tempo reale.
  • Configurazione di Spark: è necessario impostare il parametro spark.databricks.streaming.realTimeMode.enabled su true.

Configurazione di calcolo

Configurare il calcolo con le impostazioni seguenti:

  • Impostare spark.databricks.streaming.realTimeMode.enabled su true nella configurazione di Spark.
  • Disabilitare la scalabilità automatica.
  • Disabilitare l'accelerazione Photon.
  • Verificare che la computazione sia configurata come un cluster dedicato (non standard, Lakeflow Spark Declarative Pipelines, o serverless).

Per istruzioni dettagliate sulla creazione e la configurazione del calcolo per la modalità in tempo reale, vedere Introduzione alla modalità in tempo reale.

Configurazione delle query

Per eseguire una query in modalità in tempo reale, è necessario abilitare il trigger in tempo reale. I trigger in tempo reale sono supportati solo in modalità di aggiornamento.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento delle risorse di calcolo

È possibile eseguire un processo in tempo reale per ogni risorsa di calcolo se il calcolo dispone di slot di attività sufficienti.

Per l'esecuzione in modalità a bassa latenza, il numero totale di slot di attività disponibili deve essere maggiore o uguale al numero di attività in tutte le fasi della query.

Esempi di calcolo degli slot

Tipo di pipeline Configurazione Slot obbligatori
Senza stato a fase singola (origine Kafka + sink) maxPartitions = 8 8 slot
Con stato a due fasi (origine Kafka + shuffle) maxPartitions = 8, partizioni shuffle = 20 28 slot (8 + 20)
Tre fasi (di origine di Kafka + rimescolamento + ripartizione) maxPartitions = 8, due fasi di mescolamento di 20 ciascuno 48 slot (8 + 20 + 20)

Se non si imposta maxPartitions, usare il numero di partizioni nel topic Kafka.

Considerazioni chiave

Quando si configura il calcolo, tenere presente quanto segue:

  • A differenza della modalità micro batch, le attività in tempo reale possono rimanere inattive durante l'attesa dei dati, quindi il ridimensionamento corretto è essenziale per evitare sprechi di risorse.
  • Puntare a un livello di utilizzo di destinazione (ad esempio, 50%) ottimizzando:
    • maxPartitions (per Kafka)
    • spark.sql.shuffle.partitions (per fasi di rimescolamento)
  • Databricks consiglia di impostare maxPartitions in modo che ogni attività gestisca più partizioni Kafka per ridurre il sovraccarico.
  • Modificare le fasi delle attività per ogni lavoratore in modo che corrispondano al carico di lavoro per processi a una fase.
  • Per i lavori con carichi intensivi di shuffle, provare a trovare il numero minimo di partizioni di shuffle che evitano i backlog e regolare di conseguenza. Il calcolo non pianifica il processo se non dispone di slot sufficienti.

Note

Da Databricks Runtime 16.4 LTS e versioni successive, tutte le pipeline in tempo reale usano checkpoint v2, che consente di passare senza problemi tra modalità batch in tempo reale e micro batch.

Tecniche di ottimizzazione

Technique Abilitata per impostazione predefinita
Tracciamento asincrono dell'avanzamento: sposta la scrittura nel log di offset e il log di commit in un thread asincrono, riducendo il tempo inter-batch tra due micro-batch. Ciò consente di ridurre la latenza delle query di streaming senza stato. No
Checkpoint dello stato asincrono: consente di ridurre la latenza delle query di streaming con stato iniziando a elaborare il micro batch successivo non appena viene completato il calcolo del micro batch precedente, senza attendere il checkpoint dello stato. No

monitoraggio e osservabilità

La misurazione delle prestazioni delle query è essenziale per i carichi di lavoro in tempo reale. In modalità in tempo reale, le metriche della durata del batch tradizionale non riflettono la latenza effettiva, quindi sono necessari approcci alternativi.

La latenza end-to-end è specifica del carico di lavoro e talvolta può essere misurata con precisione solo con la logica di business. Ad esempio, se il timestamp di origine viene restituito in Kafka, è possibile calcolare la latenza come differenza tra il timestamp di output di Kafka e il timestamp di origine.

È anche possibile stimare la latenza end-to-end usando le metriche e le API predefinite descritte di seguito.

Metriche integrate con StreamingQueryProgress

Nell'evento StreamingQueryProgress sono incluse le metriche seguenti, che vengono registrate automaticamente nei log del driver. È anche possibile accedervi tramite la StreamingQueryListenerfunzione di callback.onQueryProgress() QueryProgressEvent.json() o toString() includono metriche aggiuntive in modalità in tempo reale.

  1. Latenza di elaborazione (processingLatencyMs). Tempo trascorso tra quando la query in modalità in tempo reale legge un record e quando la query lo scrive nella fase successiva o downstream. Per le query a fase singola, questa misura la stessa durata della latenza E2E. Il sistema segnala questa metrica per ogni attività.
  2. Latenza di accodamento di origine (sourceQueuingLatencyMs). Intervallo di tempo trascorso tra quando il sistema scrive un record in un bus di messaggi, ad esempio il tempo di accodamento del log in Kafka e quando la query in modalità in tempo reale legge il record per la prima volta. Il sistema segnala questa metrica per ogni attività.
  3. Latenza E2E (e2eLatencyMs). Tempo compreso tra il momento in cui il sistema scrive il record in un bus di messaggi e quando la query in modalità in tempo reale scrive il record downstream. Il sistema aggrega questa metrica per batch in tutti i record elaborati da tutte le attività.

Per esempio:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Misurazione della latenza personalizzata con l'API Observe

L'API Osserva consente di misurare la latenza senza avviare un altro processo. Se si dispone di un timestamp di origine che approssima l'ora di arrivo dei dati di origine, è possibile stimare la latenza di ogni batch usando l'API Osserva. Passare il timestamp prima di raggiungere il sink:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In questo esempio viene registrato un timestamp corrente prima di restituire la voce e la latenza viene stimata calcolando la differenza tra questo timestamp e il timestamp di origine del record. I risultati sono inclusi nei report in corso e resi disponibili ai listener. Ecco un output di esempio:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Supporto e limitazioni delle funzionalità

Questa sezione descrive le funzionalità supportate e le limitazioni correnti della modalità in tempo reale, inclusi ambienti compatibili, lingue, origini, sink, operatori e considerazioni speciali per funzionalità specifiche.

Ambienti, lingue e modalità supportati

Tipo di ambiente di calcolo Supported
Dedicato (in precedenza: utente singolo) Yes
Standard (in precedenza: condiviso) No
Pipeline dichiarative di Lakeflow Spark classiche No
Pipeline dichiarative di Lakeflow Spark serverless No
Serverless No

Lingue supportate:

Language Supported
Scala Yes
Java Yes
Python Yes

Modalità di esecuzione supportate:

Modalità di esecuzione Supported
Modalità di aggiornamento Yes
Append mode No
Modalità completa No

Origini e destinazioni supportate

Fonti:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Hub eventi (con il connettore Kafka) Yes
Kinesis Sì (solo modalità EFO)
Google Pub/Sub No
Apache Pulsar No

Lavandini:

Sinks Supported
Apache Kafka Yes
Hub eventi (con il connettore Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Destinazioni arbitrarie (con forEachWriter) Yes

Operatori supportati

Operators Supported
Operazioni senza stato
Selection Yes
Projection Yes
funzioni definite dall'utente
Scala UDF (Funzioni definite dall'utente in Scala) Sì (con alcune limitazioni)
Python UDF Sì (con alcune limitazioni)
Aggregazione
sum Yes
count Yes
max Yes
min Yes
avg Yes
Funzioni di aggregazione Yes
Windowing
Tumbling Yes
Sliding Yes
Session No
Deduplicazione
dropDuplicates Sì (lo stato è illimitato)
dropDuplicatesWithinWatermark (elimina duplicati all'interno del watermark) No
Stream - Unione di tabelle
Tabella broadcast (deve essere piccola) Yes
Stream - Unione di Stream No
(flat)MapGroupsWithState No
transformWithState Sì (con alcune differenze)
union Sì (con alcune limitazioni)
forEach Yes
forEachBatch No
mapPartitions No (vedere la limitazione)

Considerazioni speciali

Alcuni operatori e funzionalità hanno considerazioni o differenze specifiche quando vengono usati in modalità in tempo reale.

transformWithState in modalità tempo reale

Per la compilazione di applicazioni con stato personalizzate, Databricks supporta transformWithState, un'API in Apache Spark Structured Streaming. Per altre informazioni sull'API e sui frammenti di codice, vedere Creare un'applicazione con stato personalizzata .

Esistono tuttavia alcune differenze tra il comportamento dell'API in modalità in tempo reale e le query di streaming tradizionali che sfruttano l'architettura micro-batch.

  • La modalità in tempo reale chiama il handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) metodo per ogni riga.
    • L'iteratore inputRows restituisce un singolo valore. La modalità micro batch lo chiama una volta per ogni chiave e l'iteratore inputRows restituisce tutti i valori per una chiave nel micro batch.
    • È necessario tenere presente questa differenza durante la scrittura del codice.
  • I timer dell'ora dell'evento non sono supportati in modalità in tempo reale.
  • In modalità in tempo reale, i timer vengono ritardati durante l'attivazione a seconda dell'arrivo dei dati:
    • Se un timer è pianificato per le 10:00:00, ma non arrivano dati, il timer non viene attivato immediatamente.
    • Se i dati arrivano alle 10:00:10, il timer viene attivato con un ritardo di 10 secondi.
    • Se non arrivano dati e il batch a esecuzione lunga sta per terminare, il timer si attiva prima della conclusione del batch.

Funzioni definite dall'utente Python in modalità tempo reale

Databricks supporta la maggior parte delle funzioni definite dall'utente Python in modalità in tempo reale:

Tipo di funzione definita dall'utente Supported
Funzione definita dall'utente senza stato
Funzione scalare definita dall'utente Python (collegamento) Yes
Funzione definita dall'utente scalare Arrow Yes
Funzione scalare definita dall'utente Pandas (collegamento) Yes
Funzione Arrow (mapInArrow) Yes
Funzione di Pandas (collegamento) Yes
UDF con raggruppamento con stato (UDAF)
transformWithState (solo Row interfaccia) Yes
applicaInPandasConStato No
Funzione definita dall'utente con raggruppamento non con stato (UDAF)
apply No
applyInArrow No
applyInPandas No
Funzione tabella
UDTF (collegamento) No
UC UDF No

Esistono diversi aspetti da considerare quando si usano UDF Python in modalità tempo reale.

  • Per ridurre al minimo la latenza, configurare le dimensioni del batch di Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) su 1.
    • Compromesso: questa configurazione ottimizza la latenza a scapito della capacità di trasmissione. Per la maggior parte dei carichi di lavoro, questa impostazione è consigliata.
    • Aumentare le dimensioni del batch solo se è necessaria una velocità effettiva maggiore per supportare il volume di input, accettando il potenziale aumento della latenza.
  • Le funzioni definite dall'utente e le funzioni di pandas non funzionano bene con un batch Arrow di dimensione 1.
    • Se si usano funzioni definite dall'utente o funzioni pandas, impostare le dimensioni del batch Arrow su un valore superiore, ad esempio 100 o superiore.
    • Si noti che ciò implica una latenza più elevata. Databricks consiglia di usare la funzione o la funzione UDF Arrow, se possibile.
  • A causa del problema di prestazioni con pandas, transformWithState è supportato solo con l'interfaccia Row .

Limitations

Limitazioni dell'origine

Per Kinesis, la modalità in tempo reale non supporta la modalità di polling. Inoltre, le ripartizioni frequenti potrebbero influire negativamente sulla latenza.

Limitazioni dell'unione

L'operatore Union presenta alcune limitazioni:

  • La modalità in tempo reale non supporta l'unione automatica:
    • Kafka: non è possibile usare lo stesso oggetto frame di dati di origine e unire frame di dati derivati da esso. Soluzione alternativa: usare diversi dataframe letti dalla stessa origine.
    • Kinesis: non è possibile fare l'unione dei data frame derivati dalla stessa origine Kinesis con la stessa configurazione. Soluzione alternativa: oltre a usare diversi dataframe, è possibile assegnare un'opzione "consumerName" diversa a ogni dataframe.
  • La modalità in tempo reale non supporta gli operatori con stato (ad esempio , aggregatededuplicate, , transformWithState) definiti prima dell'unione.
  • La modalità in tempo reale non supporta l'unione con le origini batch.

Limitazione di MapPartitions

mapPartitions in Scala e api Python simili (mapInPandas, mapInArrow) accettano un iteratore dell'intera partizione di input e producono un iteratore dell'intero output con mapping arbitrario tra input e output. Queste API possono causare problemi di prestazioni nella modalità streaming Real-Time bloccando l'intero output, aumentando la latenza. La semantica di queste API non supporta adeguatamente la propagazione della filigrana.

Utilizzare funzioni definite dall'utente scalari combinate con Trasforma tipi di dati complessi o filter per ottenere funzionalità simili.

Passaggi successivi

Ora che si comprende la modalità in tempo reale e come configurarla, esplorare queste risorse per iniziare a implementare applicazioni di streaming in tempo reale: