Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Important
Questa funzionalità è in Anteprima Pubblica.
La modalità in tempo reale è un tipo di trigger per Structured Streaming che consente l'elaborazione dei dati a bassa latenza con latenza end-to-end inferiore a cinque millisecondi. Usare la modalità in tempo reale per i carichi di lavoro operativi che richiedono una risposta immediata ai dati di streaming, ad esempio il rilevamento delle frodi, la personalizzazione in tempo reale e i sistemi decisionali istantanei.
La modalità in tempo reale è disponibile in Databricks Runtime 16.4 LTS e versioni successive. Per istruzioni dettagliate sulla configurazione, vedere Introduzione alla modalità in tempo reale. Per esempi di codice, vedere Esempi di modalità in tempo reale.
Che cos'è la modalità in tempo reale?
Carichi di lavoro operativi e analitici
I carichi di lavoro di streaming possono essere suddivisi in generale in carichi di lavoro analitici e carichi di lavoro operativi:
- I carichi di lavoro analitici usano l'inserimento e la trasformazione dei dati, in genere seguendo l'architettura medallion (ad esempio, l'inserimento di dati nelle tabelle bronze, silver e gold).
- I carichi di lavoro operativi usano dati in tempo reale, applicano la logica di business e attivano azioni o decisioni downstream.
Ecco alcuni esempi di carichi di lavoro operativi:
- Blocco o contrassegno di una transazione con carta di credito in tempo reale se un punteggio di frode supera una soglia, in base a fattori come posizione insolita, dimensioni elevate delle transazioni o modelli di spesa rapidi.
- Il recapito di un messaggio promozionale quando i dati clickstream mostrano che un utente ha esplorato i jeans per cinque minuti, offrendo uno sconto di 25% se acquista nei prossimi 15 minuti.
In generale, i carichi di lavoro operativi sono caratterizzati dalla necessità di una latenza end-to-end sotto il secondo. Questa operazione può essere ottenuta con la modalità in tempo reale in Apache Spark Structured Streaming.
Come la modalità in tempo reale raggiunge una bassa latenza
La modalità in tempo reale migliora l'architettura di esecuzione tramite:
- Esecuzione di batch a esecuzione prolungata (il valore predefinito è cinque minuti), in cui il sistema elabora i dati man mano che diventa disponibile nell'origine.
- Pianificazione simultanea di tutte le fasi della query. Ciò richiede che il numero di slot di attività disponibili sia uguale o maggiore del numero di attività di tutte le fasi di un batch.
- Trasferimento di dati tra le fasi non appena vengono prodotti, mediante uno shuffle in streaming.
Alla fine dell'elaborazione di un batch e prima dell'avvio del successivo, Structured Streaming effettua checkpoint del progresso e pubblica le metriche. La durata del batch influisce sulla frequenza di checkpoint:
- Batch più lunghi: controlli meno frequenti, ovvero riproduzioni più lunghe in caso di guasto e disponibilità ritardata delle metriche.
- Batch più brevi: checkpoint più frequenti, che possono influire sulla latenza.
Databricks consiglia di eseguire il benchmarking della modalità in tempo reale rispetto al carico di lavoro di destinazione per trovare l'intervallo di trigger appropriato.
Quando usare la modalità in tempo reale
Scegliere la modalità in tempo reale quando il caso d'uso richiede:
- Latenza secondaria: applicazioni che devono rispondere ai dati entro millisecondi, ad esempio sistemi di rilevamento delle frodi che devono bloccare le transazioni in tempo reale.
- Processo decisionale operativo: sistemi che attivano azioni immediate in base ai dati in ingresso, ad esempio offerte in tempo reale, avvisi o notifiche.
- Elaborazione continua: carichi di lavoro in cui i dati devono essere elaborati non appena arrivano, anziché in batch periodici.
Usare la modalità micro-batch (il trigger predefinito di Streaming Strutturato) quando:
- Elaborazione analitica: pipeline ETL, trasformazioni dei dati e implementazioni dell'architettura medallion in cui i requisiti di latenza vengono misurati in secondi o minuti.
- Ottimizzazione dei costi: i carichi di lavoro in cui non è necessaria la latenza al secondo secondario, perché la modalità in tempo reale richiede risorse di calcolo dedicate.
- La frequenza del checkpoint è importante: applicazioni che traggono vantaggio da un checkpointing più frequente per un recupero più rapido.
Requisiti e configurazione
La modalità in tempo reale prevede requisiti specifici per la configurazione di calcolo e la configurazione delle query. Questa sezione descrive i prerequisiti e i passaggi di configurazione necessari per usare la modalità in tempo reale.
Prerequisiti
Per usare la modalità in tempo reale, è necessario soddisfare i requisiti seguenti:
- Databricks Runtime 16.4 LTS o versione successiva: la modalità in tempo reale è disponibile solo in DBR 16.4 LTS e versioni successive.
- Calcolo dedicato: è necessario usare un calcolo dedicato (in precedenza utente singolo). Non sono supportati i cluster standard (in precedenza condivisi), le pipeline dichiarative di Lakeflow Spark e i cluster serverless.
- Nessuna scalabilità automatica: la scalabilità automatica deve essere disabilitata.
- Nessun fotono: l'accelerazione photon non è supportata con la modalità in tempo reale.
-
Configurazione di Spark: è necessario impostare il parametro
spark.databricks.streaming.realTimeMode.enabledsutrue.
Configurazione di calcolo
Configurare il calcolo con le impostazioni seguenti:
- Impostare
spark.databricks.streaming.realTimeMode.enabledsutruenella configurazione di Spark. - Disabilitare la scalabilità automatica.
- Disabilitare l'accelerazione Photon.
- Verificare che la computazione sia configurata come un cluster dedicato (non standard, Lakeflow Spark Declarative Pipelines, o serverless).
Per istruzioni dettagliate sulla creazione e la configurazione del calcolo per la modalità in tempo reale, vedere Introduzione alla modalità in tempo reale.
Configurazione delle query
Per eseguire una query in modalità in tempo reale, è necessario abilitare il trigger in tempo reale. I trigger in tempo reale sono supportati solo in modalità di aggiornamento.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionamento delle risorse di calcolo
È possibile eseguire un processo in tempo reale per ogni risorsa di calcolo se il calcolo dispone di slot di attività sufficienti.
Per l'esecuzione in modalità a bassa latenza, il numero totale di slot di attività disponibili deve essere maggiore o uguale al numero di attività in tutte le fasi della query.
Esempi di calcolo degli slot
| Tipo di pipeline | Configurazione | Slot obbligatori |
|---|---|---|
| Senza stato a fase singola (origine Kafka + sink) |
maxPartitions = 8 |
8 slot |
| Con stato a due fasi (origine Kafka + shuffle) |
maxPartitions = 8, partizioni shuffle = 20 |
28 slot (8 + 20) |
| Tre fasi (di origine di Kafka + rimescolamento + ripartizione) |
maxPartitions = 8, due fasi di mescolamento di 20 ciascuno |
48 slot (8 + 20 + 20) |
Se non si imposta maxPartitions, usare il numero di partizioni nel topic Kafka.
Considerazioni chiave
Quando si configura il calcolo, tenere presente quanto segue:
- A differenza della modalità micro batch, le attività in tempo reale possono rimanere inattive durante l'attesa dei dati, quindi il ridimensionamento corretto è essenziale per evitare sprechi di risorse.
- Puntare a un livello di utilizzo di destinazione (ad esempio, 50%) ottimizzando:
-
maxPartitions(per Kafka) -
spark.sql.shuffle.partitions(per fasi di rimescolamento)
-
- Databricks consiglia di impostare
maxPartitionsin modo che ogni attività gestisca più partizioni Kafka per ridurre il sovraccarico. - Modificare le fasi delle attività per ogni lavoratore in modo che corrispondano al carico di lavoro per processi a una fase.
- Per i lavori con carichi intensivi di shuffle, provare a trovare il numero minimo di partizioni di shuffle che evitano i backlog e regolare di conseguenza. Il calcolo non pianifica il processo se non dispone di slot sufficienti.
Note
Da Databricks Runtime 16.4 LTS e versioni successive, tutte le pipeline in tempo reale usano checkpoint v2, che consente di passare senza problemi tra modalità batch in tempo reale e micro batch.
Tecniche di ottimizzazione
| Technique | Abilitata per impostazione predefinita |
|---|---|
| Tracciamento asincrono dell'avanzamento: sposta la scrittura nel log di offset e il log di commit in un thread asincrono, riducendo il tempo inter-batch tra due micro-batch. Ciò consente di ridurre la latenza delle query di streaming senza stato. | No |
| Checkpoint dello stato asincrono: consente di ridurre la latenza delle query di streaming con stato iniziando a elaborare il micro batch successivo non appena viene completato il calcolo del micro batch precedente, senza attendere il checkpoint dello stato. | No |
monitoraggio e osservabilità
La misurazione delle prestazioni delle query è essenziale per i carichi di lavoro in tempo reale. In modalità in tempo reale, le metriche della durata del batch tradizionale non riflettono la latenza effettiva, quindi sono necessari approcci alternativi.
La latenza end-to-end è specifica del carico di lavoro e talvolta può essere misurata con precisione solo con la logica di business. Ad esempio, se il timestamp di origine viene restituito in Kafka, è possibile calcolare la latenza come differenza tra il timestamp di output di Kafka e il timestamp di origine.
È anche possibile stimare la latenza end-to-end usando le metriche e le API predefinite descritte di seguito.
Metriche integrate con StreamingQueryProgress
Nell'evento StreamingQueryProgress sono incluse le metriche seguenti, che vengono registrate automaticamente nei log del driver. È anche possibile accedervi tramite la StreamingQueryListenerfunzione di callback.onQueryProgress()
QueryProgressEvent.json() o toString() includono metriche aggiuntive in modalità in tempo reale.
- Latenza di elaborazione (processingLatencyMs). Tempo trascorso tra quando la query in modalità in tempo reale legge un record e quando la query lo scrive nella fase successiva o downstream. Per le query a fase singola, questa misura la stessa durata della latenza E2E. Il sistema segnala questa metrica per ogni attività.
- Latenza di accodamento di origine (sourceQueuingLatencyMs). Intervallo di tempo trascorso tra quando il sistema scrive un record in un bus di messaggi, ad esempio il tempo di accodamento del log in Kafka e quando la query in modalità in tempo reale legge il record per la prima volta. Il sistema segnala questa metrica per ogni attività.
- Latenza E2E (e2eLatencyMs). Tempo compreso tra il momento in cui il sistema scrive il record in un bus di messaggi e quando la query in modalità in tempo reale scrive il record downstream. Il sistema aggrega questa metrica per batch in tutti i record elaborati da tutte le attività.
Per esempio:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Misurazione della latenza personalizzata con l'API Observe
L'API Osserva consente di misurare la latenza senza avviare un altro processo. Se si dispone di un timestamp di origine che approssima l'ora di arrivo dei dati di origine, è possibile stimare la latenza di ogni batch usando l'API Osserva. Passare il timestamp prima di raggiungere il sink:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
In questo esempio viene registrato un timestamp corrente prima di restituire la voce e la latenza viene stimata calcolando la differenza tra questo timestamp e il timestamp di origine del record. I risultati sono inclusi nei report in corso e resi disponibili ai listener. Ecco un output di esempio:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Supporto e limitazioni delle funzionalità
Questa sezione descrive le funzionalità supportate e le limitazioni correnti della modalità in tempo reale, inclusi ambienti compatibili, lingue, origini, sink, operatori e considerazioni speciali per funzionalità specifiche.
Ambienti, lingue e modalità supportati
| Tipo di ambiente di calcolo | Supported |
|---|---|
| Dedicato (in precedenza: utente singolo) | Yes |
| Standard (in precedenza: condiviso) | No |
| Pipeline dichiarative di Lakeflow Spark classiche | No |
| Pipeline dichiarative di Lakeflow Spark serverless | No |
| Serverless | No |
Lingue supportate:
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Modalità di esecuzione supportate:
| Modalità di esecuzione | Supported |
|---|---|
| Modalità di aggiornamento | Yes |
| Append mode | No |
| Modalità completa | No |
Origini e destinazioni supportate
Fonti:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Hub eventi (con il connettore Kafka) | Yes |
| Kinesis | Sì (solo modalità EFO) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Lavandini:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Hub eventi (con il connettore Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Destinazioni arbitrarie (con forEachWriter) | Yes |
Operatori supportati
| Operators | Supported |
|---|---|
| Operazioni senza stato | |
| Selection | Yes |
| Projection | Yes |
| funzioni definite dall'utente | |
| Scala UDF (Funzioni definite dall'utente in Scala) | Sì (con alcune limitazioni) |
| Python UDF | Sì (con alcune limitazioni) |
| Aggregazione | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Funzioni di aggregazione | Yes |
| Windowing | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Deduplicazione | |
| dropDuplicates | Sì (lo stato è illimitato) |
| dropDuplicatesWithinWatermark (elimina duplicati all'interno del watermark) | No |
| Stream - Unione di tabelle | |
| Tabella broadcast (deve essere piccola) | Yes |
| Stream - Unione di Stream | No |
| (flat)MapGroupsWithState | No |
| transformWithState | Sì (con alcune differenze) |
| union | Sì (con alcune limitazioni) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | No (vedere la limitazione) |
Considerazioni speciali
Alcuni operatori e funzionalità hanno considerazioni o differenze specifiche quando vengono usati in modalità in tempo reale.
transformWithState in modalità tempo reale
Per la compilazione di applicazioni con stato personalizzate, Databricks supporta transformWithState, un'API in Apache Spark Structured Streaming. Per altre informazioni sull'API e sui frammenti di codice, vedere Creare un'applicazione con stato personalizzata .
Esistono tuttavia alcune differenze tra il comportamento dell'API in modalità in tempo reale e le query di streaming tradizionali che sfruttano l'architettura micro-batch.
- La modalità in tempo reale chiama il
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)metodo per ogni riga.- L'iteratore
inputRowsrestituisce un singolo valore. La modalità micro batch lo chiama una volta per ogni chiave e l'iteratoreinputRowsrestituisce tutti i valori per una chiave nel micro batch. - È necessario tenere presente questa differenza durante la scrittura del codice.
- L'iteratore
- I timer dell'ora dell'evento non sono supportati in modalità in tempo reale.
- In modalità in tempo reale, i timer vengono ritardati durante l'attivazione a seconda dell'arrivo dei dati:
- Se un timer è pianificato per le 10:00:00, ma non arrivano dati, il timer non viene attivato immediatamente.
- Se i dati arrivano alle 10:00:10, il timer viene attivato con un ritardo di 10 secondi.
- Se non arrivano dati e il batch a esecuzione lunga sta per terminare, il timer si attiva prima della conclusione del batch.
Funzioni definite dall'utente Python in modalità tempo reale
Databricks supporta la maggior parte delle funzioni definite dall'utente Python in modalità in tempo reale:
| Tipo di funzione definita dall'utente | Supported |
|---|---|
| Funzione definita dall'utente senza stato | |
| Funzione scalare definita dall'utente Python (collegamento) | Yes |
| Funzione definita dall'utente scalare Arrow | Yes |
| Funzione scalare definita dall'utente Pandas (collegamento) | Yes |
Funzione Arrow (mapInArrow) |
Yes |
| Funzione di Pandas (collegamento) | Yes |
| UDF con raggruppamento con stato (UDAF) | |
transformWithState (solo Row interfaccia) |
Yes |
| applicaInPandasConStato | No |
| Funzione definita dall'utente con raggruppamento non con stato (UDAF) | |
| apply | No |
| applyInArrow | No |
| applyInPandas | No |
| Funzione tabella | |
| UDTF (collegamento) | No |
| UC UDF | No |
Esistono diversi aspetti da considerare quando si usano UDF Python in modalità tempo reale.
- Per ridurre al minimo la latenza, configurare le dimensioni del batch di Arrow (
spark.sql.execution.arrow.maxRecordsPerBatch) su 1.- Compromesso: questa configurazione ottimizza la latenza a scapito della capacità di trasmissione. Per la maggior parte dei carichi di lavoro, questa impostazione è consigliata.
- Aumentare le dimensioni del batch solo se è necessaria una velocità effettiva maggiore per supportare il volume di input, accettando il potenziale aumento della latenza.
- Le funzioni definite dall'utente e le funzioni di pandas non funzionano bene con un batch Arrow di dimensione 1.
- Se si usano funzioni definite dall'utente o funzioni pandas, impostare le dimensioni del batch Arrow su un valore superiore, ad esempio 100 o superiore.
- Si noti che ciò implica una latenza più elevata. Databricks consiglia di usare la funzione o la funzione UDF Arrow, se possibile.
- A causa del problema di prestazioni con pandas, transformWithState è supportato solo con l'interfaccia
Row.
Limitations
Limitazioni dell'origine
Per Kinesis, la modalità in tempo reale non supporta la modalità di polling. Inoltre, le ripartizioni frequenti potrebbero influire negativamente sulla latenza.
Limitazioni dell'unione
L'operatore Union presenta alcune limitazioni:
- La modalità in tempo reale non supporta l'unione automatica:
- Kafka: non è possibile usare lo stesso oggetto frame di dati di origine e unire frame di dati derivati da esso. Soluzione alternativa: usare diversi dataframe letti dalla stessa origine.
- Kinesis: non è possibile fare l'unione dei data frame derivati dalla stessa origine Kinesis con la stessa configurazione. Soluzione alternativa: oltre a usare diversi dataframe, è possibile assegnare un'opzione "consumerName" diversa a ogni dataframe.
- La modalità in tempo reale non supporta gli operatori con stato (ad esempio ,
aggregatededuplicate, ,transformWithState) definiti prima dell'unione. - La modalità in tempo reale non supporta l'unione con le origini batch.
Limitazione di MapPartitions
mapPartitions in Scala e api Python simili (mapInPandas, mapInArrow) accettano un iteratore dell'intera partizione di input e producono un iteratore dell'intero output con mapping arbitrario tra input e output. Queste API possono causare problemi di prestazioni nella modalità streaming Real-Time bloccando l'intero output, aumentando la latenza. La semantica di queste API non supporta adeguatamente la propagazione della filigrana.
Utilizzare funzioni definite dall'utente scalari combinate con Trasforma tipi di dati complessi o filter per ottenere funzionalità simili.
Passaggi successivi
Ora che si comprende la modalità in tempo reale e come configurarla, esplorare queste risorse per iniziare a implementare applicazioni di streaming in tempo reale:
- Introduzione alla modalità in tempo reale : seguire istruzioni dettagliate per configurare il calcolo ed eseguire la prima query di streaming in tempo reale.
- Esempi di codice in modalità in tempo reale; Esplora esempi funzionanti, tra cui Kafka sources e sink, query stateful, aggregazioni e sink personalizzati.
- Concetti di Structured Streaming : informazioni sui concetti fondamentali di Structured Streaming in Databricks.