Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Dieses Feature befindet sich in der Public Preview.
Der Echtzeitmodus ist ein Triggertyp für strukturiertes Streaming, der eine extrem niedrige Datenverarbeitung mit End-to-End-Latenz mit bis zu fünf Millisekunden ermöglicht. Verwenden Sie den Echtzeitmodus für betriebsbereite Workloads, die sofortige Reaktion auf Streamingdaten erfordern, z. B. Betrugserkennung, Echtzeitpersonalisierung und sofortige Entscheidungssysteme.
Der Echtzeitmodus ist in Databricks Runtime 16.4 LTS und höher verfügbar. Schrittweise Anleitungen zum Einrichten finden Sie unter "Erste Schritte mit dem Echtzeitmodus". Codebeispiele finden Sie in Den Beispielen für den Echtzeitmodus.
Was ist der Echtzeitmodus?
Operative und analytische Arbeitslasten
Streaming-Workloads können allgemein in analytische Workloads und betriebsbereite Workloads unterteilt werden:
- Analytische Workloads verwenden Datenaufnahme und Transformation, in der Regel nach der Medallion-Architektur (z. B. Das Aufnehmen von Daten in die Bronze-, Silber- und Goldtabellen).
- Betriebsarbeitslasten verbrauchen Echtzeitdaten, wenden Geschäftslogik an und lösen nachgeschaltete Aktionen oder Entscheidungen aus.
Einige Beispiele für betriebliche Arbeitslasten sind:
- Blockieren oder Kennzeichnen einer Kreditkartentransaktion in Echtzeit, wenn eine Betrugsbewertung einen Schwellenwert überschreitet, basierend auf Faktoren wie ungewöhnlichem Standort, großer Transaktionsgröße oder schnellen Ausgabenmustern.
- Die Übermittlung einer Werbenachricht erfolgt, wenn Clickstream-Daten zeigen, dass ein Benutzer seit fünf Minuten nach Jeans durchsucht, und es wird ein Rabatt von 25 % angeboten, wenn der Benutzer innerhalb der nächsten 15 Minuten einen Kauf tätigt.
Im Allgemeinen sind operative Arbeitslasten durch die Notwendigkeit gekennzeichnet, eine End-to-End-Latenz unterhalb einer Sekunde zu erreichen. Dies kann mit dem Echtzeitmodus in Apache Spark Structured Streaming erreicht werden.
So erzielt der Echtzeitmodus eine niedrige Latenz
Der Echtzeitmodus verbessert die Ausführungsarchitektur durch:
- Das Ausführen langer Laufender Batches (der Standardwert ist fünf Minuten), in dem das System Daten verarbeitet, sobald sie in der Quelle verfügbar sind.
- Planen Sie alle Abfrageschritte gleichzeitig. Dies erfordert, dass die Anzahl der verfügbaren Vorgangsplätze gleich oder größer als die Anzahl der Vorgänge aller Phasen in einem Batch ist.
- Übertragung von Daten zwischen den Phasen, sobald sie mittels Streaming-Shuffle übermittelt werden.
Am Ende der Verarbeitung eines Batches und vor dem Beginn des nächsten Batches erfasst Structured Streaming den Fortschritt und protokolliert Metriken. Die Batch-Dauer wirkt sich auf die Prüfpunkthäufigkeit aus.
- Längere Batches: Weniger häufiges Checkpointing, was zu längeren Wiederholungen bei Ausfällen und verzögerter Verfügbarkeit von Metriken führt.
- Kürzere Batches: Häufigere Prüfpunkte, die sich auf die Latenz auswirken können.
Databricks empfiehlt, den Echtzeitmodus mit Ihrer Zielarbeitsauslastung zu vergleichen, um das entsprechende Triggerintervall zu finden.
Gründe für die Verwendung des Echtzeitmodus
Wählen Sie den Echtzeitmodus aus, wenn Für Ihren Anwendungsfall Folgendes erforderlich ist:
- Sekundäre Latenz: Anwendungen, die innerhalb von Millisekunden auf Daten reagieren müssen, z. B. Betrugserkennungssysteme, die Transaktionen in Echtzeit blockieren müssen.
- Operative Entscheidungsfindung: Systeme, die sofortige Aktionen basierend auf eingehenden Daten auslösen, z. B. Echtzeitangebote, Warnungen oder Benachrichtigungen.
- Kontinuierliche Verarbeitung: Workloads, bei denen Daten verarbeitet werden müssen, sobald sie eintreffen, und nicht in regelmäßigen Batches.
Verwenden Sie den Mikrobatchmodus (der standardmäßige strukturierte Streamingtrigger), wenn:
- Analytische Verarbeitung: ETL-Pipelines, Datentransformationen und Medallion-Architekturimplementierungen, bei denen Latenzanforderungen in Sekunden oder Minuten gemessen werden.
- Kostenoptimierung: Workloads, bei denen die Latenz von Unter-Sekunden nicht erforderlich ist, da im Echtzeitmodus dedizierte Computeressourcen erforderlich sind.
- Die Prüfpunkthäufigkeit ist wichtig: Anwendungen, die von häufigeren Prüfpunkten für eine schnellere Wiederherstellung profitieren.
Anforderungen und Konfiguration
Der Echtzeitmodus hat spezifische Anforderungen für die Berechnungseinrichtung und Abfragekonfiguration. In diesem Abschnitt werden die Voraussetzungen und Konfigurationsschritte beschrieben, die für die Verwendung des Echtzeitmodus erforderlich sind.
Voraussetzungen
Um den Echtzeitmodus zu verwenden, müssen Sie die folgenden Anforderungen erfüllen:
- Databricks Runtime 16.4 LTS oder höher: Echtzeitmodus ist nur in DBR 16.4 LTS und höheren Versionen verfügbar.
- Dedizierter Computer: Sie müssen einen dedizierten (früher einzelner Benutzer) Computer verwenden. Standard (früher 'Shared'), "Lakeflow Spark Declarative Pipelines" und Serverless-Cluster werden nicht unterstützt.
- Keine automatische Skalierung: Die automatische Skalierung muss deaktiviert werden.
- Kein Photon: Die Photonbeschleunigung wird im Echtzeitmodus nicht unterstützt.
-
Spark-Konfiguration: Sie müssen
spark.databricks.streaming.realTimeMode.enabledauftrueeinstellen.
Rechnerkonfiguration
Konfigurieren Sie Ihre Berechnung mit den folgenden Einstellungen:
- Setze
spark.databricks.streaming.realTimeMode.enabledauftruein der Spark-Konfiguration. - Deaktivieren Sie die automatische Skalierung.
- Deaktivieren Sie die Photonbeschleunigung.
- Stellen Sie sicher, dass die Berechnung als dedizierter Cluster konfiguriert ist (keine Standard-, Lakeflow Spark Declarative Pipelines oder Serverless).
Schrittweise Anleitungen zum Erstellen und Konfigurieren von Compute für den Echtzeitmodus finden Sie unter "Erste Schritte mit dem Echtzeitmodus".
Abfragekonfiguration
Um eine Abfrage im Echtzeitmodus auszuführen, müssen Sie den Echtzeittrigger aktivieren. Echtzeittrigger werden nur im Updatemodus unterstützt.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Skalierung von Berechnungen
Sie können einen Echtzeitauftrag pro Rechenressource ausführen, wenn die Rechenressource über genügend Task-Slots verfügt.
Um im Modus mit geringer Latenz ausgeführt zu werden, muss die Gesamtanzahl der verfügbaren Aufgabenplätze größer oder gleich der Anzahl der Aufgaben in allen Abfragephasen sein.
Slot-Berechnungsbeispiele
| Pipelinetyp | Konfiguration | Erforderliche Steckplätze |
|---|---|---|
| Einzelstufenzustandslos (Kafka Source + Sink) |
maxPartitions = 8 |
8 Steckplätze |
| Zweistufiger Zustand (Kafka-Quelle + Shuffle) |
maxPartitions = 8, shuffle partitions = 20 |
28 Steckplätze (8 + 20) |
| Dreistufige (Kafka-Quelle + *shuffle* + *repartition*) |
maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 |
48 Steckplätze (8 + 20 + 20) |
Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.
Wichtige Überlegungen
Berücksichtigen Sie beim Konfigurieren der Berechnung Folgendes:
- Im Gegensatz zum Mikrobatchmodus können Echtzeitaufgaben im Leerlauf bleiben, während sie auf Daten warten. Daher ist die richtige Größenanpassung unerlässlich, um verschwendete Ressourcen zu vermeiden.
- Ziel einer Zielauslastungsstufe (z. B. 50%) durch Optimierung:
-
maxPartitions(für Kafka) -
spark.sql.shuffle.partitions(für Shuffle-Phasen)
-
- Databricks empfiehlt die Einstellung
maxPartitions, sodass jede Aufgabe mehrere Kafka-Partitionen verarbeitet, um den Aufwand zu reduzieren. - Passen Sie die Aufgabenslots pro Worker für einfache einstufige Aufträge an die Workload an.
- Experimentieren Sie bei Shuffle-intensiven Aufträgen, um die minimale Anzahl an Shufflepartitionen zu ermitteln, mit der Rückstände vermieden werden können. Nehmen Sie entsprechende Anpassungen vor. Das Computersystem plant den Auftrag nicht, wenn es nicht über genügend Kapazitäten verfügt.
Note
Von Databricks Runtime 16.4 LTS und höher verwenden alle Echtzeit-Pipelines Checkpoint v2, was einen nahtlosen Wechsel zwischen Echtzeit- und Mikrobatch-Modi ermöglicht.
Optimierungstechniken
| Technique | Standardmäßig aktiviert |
|---|---|
| Asynchrone Fortschrittsverfolgung: Verschiebt das Schreiben in das Offsetlog und Commit-Log in einen asynchronen Thread und reduziert damit die Zeit zwischen zwei Mikrobatches. Dies kann dazu beitragen, die Latenz von zustandslosen Streamingabfragen zu verringern. | No |
| Asynchrone Zustandsprüfpunkterstellung: Hilft, die Latenz von zustandsbehafteten Streamingabfragen zu reduzieren, indem sie mit der Verarbeitung des nächsten Mikro-Batchs beginnt, sobald die Berechnung des vorherigen Mikro-Batchs abgeschlossen ist, ohne auf Zustandsprüfpunkte zu warten. | No |
Überwachung und Beobachtbarkeit
Die Messung der Abfrageleistung ist für Echtzeitworkloads unerlässlich. Im Echtzeitmodus spiegeln herkömmliche Batchdauermetriken keine tatsächliche Latenz wider, daher benötigen Sie alternative Ansätze.
End-to-End-Latenz ist arbeitslastspezifisch und kann manchmal nur mit Geschäftslogik genau gemessen werden. Wenn z. B. der Quellzeitstempel in Kafka ausgegeben wird, können Sie die Latenz als Differenz zwischen dem Ausgabezeitstempel von Kafka und dem Quellzeitstempel berechnen.
Sie können die End-to-End-Latenz auch mithilfe der unten beschriebenen integrierten Metriken und APIs schätzen.
Integrierte Metriken mit StreamingQueryProgress
Die folgenden Metriken sind im StreamingQueryProgress Ereignis enthalten, das automatisch in den Treiberprotokollen protokolliert wird. Sie können auch über die StreamingQueryListener-onQueryProgress()-Rückruffunktion darauf zugreifen.
QueryProgressEvent.json() oder toString() zusätzliche Echtzeitmodusmetriken enthalten.
- Verarbeitungslatenz (processingLatencyMs). Die Zeit, die im Echtzeitmodus zwischen dem Lesen eines Datensatzes durch die Abfrage und dem Schreiben der Abfrage in die nächste Phase oder weiter unten im Prozess vergeht. Bei Einzelstufenabfragen misst dies die gleiche Dauer wie die E2E-Latenz. Das System meldet diese Metrik pro Vorgang.
- Quellwarteschlangenlatenz (sourceQueuingLatencyMs). Der Zeitraum, der zwischen dem Schreiben eines Datensatzes durch das System in einen Nachrichtenbus, z. B. die Anfügezeit des Protokolls in Kafka, und dem ersten Lesen des Datensatzes durch die Echtzeitmodusabfrage verstrichen ist. Das System meldet diese Metrik pro Vorgang.
- E2E-Latenz (e2eLatencyMs). Die Zeit zwischen dem Schreiben des Datensatzes in einen Nachrichtenbus und dem Schreiben des Datensatzes durch die Echtzeitanfrage weiter unten. Das System aggregiert diese Metrik pro Batch für alle Datensätze, die von allen Vorgängen verarbeitet werden.
Beispiel:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Benutzerdefinierte Latenzmessung mit der Observe-API
Die Observe-API hilft beim Messen der Latenz, ohne einen anderen Auftrag zu starten. Wenn Sie über einen Quellzeitstempel verfügen, der die Ankunftszeit der Quelldaten annähert, können Sie die Latenz jedes Batches mithilfe der Observe-API schätzen. Übergeben Sie den Zeitstempel, bevor Sie die Spüle erreichen:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
In diesem Beispiel wird ein aktueller Zeitstempel vor der Ausgabe des Eintrags aufgezeichnet, und die Latenz wird geschätzt, indem die Differenz zwischen diesem Zeitstempel und dem Quellzeitstempel des Datensatzes berechnet wird. Die Ergebnisse werden in Fortschrittsberichten aufgenommen und den Zuhörern zur Verfügung gestellt. Hier ist eine Beispielausgabe:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Featureunterstützung und -einschränkungen
In diesem Abschnitt werden die unterstützten Features und aktuellen Einschränkungen des Echtzeitmodus beschrieben, einschließlich kompatibler Umgebungen, Sprachen, Quellen, Senken, Operatoren und besonderer Überlegungen zu bestimmten Features.
Unterstützte Umgebungen, Sprachen und Modi
| Computetyp | Supported |
|---|---|
| Dedizierter (früher: einzelner Benutzer) | Yes |
| Standard (früher: geteilt) | No |
| Lakeflow Spark Deklarative Pipelines Klassisch | No |
| Lakeflow Spark Deklarative Pipelines Serverlos | No |
| Serverless | No |
Unterstützte Sprachen:
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Unterstützte Ausführungsmodi:
| Ausführungsmodus | Supported |
|---|---|
| Aktualisierungsmodus | Yes |
| Append mode | No |
| Vollständiger Modus | No |
Unterstützte Quellen und Empfänger
Quellen:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Event Hubs (mit Kafka Connector) | Yes |
| Kinesis | Ja (nur EFO-Modus) |
| Google Pub/Sub (Nachrichtendienst) | No |
| Apache Pulsar | No |
Waschbecken:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Event Hubs (mit Kafka Connector) | Yes |
| Kinesis | No |
| Google Pub/Sub (Nachrichtendienst) | No |
| Apache Pulsar | No |
| Beliebige Senken (mit forEachWriter) | Yes |
Unterstützte Operatoren
| Operators | Supported |
|---|---|
| Zustandslose Operationen | |
| Selection | Yes |
| Projection | Yes |
| UDFs | |
| Skala UDF | Ja (mit einigen Einschränkungen) |
| Python-Benutzerdefinierte Funktion (UDF) | Ja (mit einigen Einschränkungen) |
| Zusammenfassung | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Aggregationsfunktionen | Yes |
| Windowing | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Deduplizierung | |
| dropDuplicates | Ja (der Zustand ist ungebunden) |
| dropDuplicatesWithinWatermark | No |
| Stream–Tabellenverknüpfung | |
| Übertragungstabelle (sollte klein sein) | Yes |
| Stream – Stream-Join | No |
| (flach)MapGroupsWithState | No |
| transformWithState | Ja (mit einigen Unterschieden) |
| union | Ja (mit einigen Einschränkungen) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Nein (siehe Einschränkung) |
Besondere Überlegungen
Einige Operatoren und Features weisen spezifische Überlegungen oder Unterschiede auf, wenn sie im Echtzeitmodus verwendet werden.
transformWithState im Echtzeitmodus
Zum Erstellen von benutzerdefinierten zustandsbehafteten Anwendungen unterstützt Databricks transformWithState, eine API in Apache Spark Structured Streaming. Weitere Informationen zu API- und Codeausschnitten finden Sie unter Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung .
Es gibt jedoch einige Unterschiede zwischen dem Verhalten der API im Echtzeitmodus und herkömmlichen Streamingabfragen, die die Mikrobatcharchitektur nutzen.
- Im Echtzeitmodus wird die
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)Methode für jede Zeile aufgerufen.- Der
inputRowsIterator gibt einen einzelnen Wert zurück. Der Mikrobatchmodus ruft ihn einmal für jeden Schlüssel auf, und derinputRowsIterator gibt alle Werte für einen Schlüssel im Mikrobatch zurück. - Dieser Unterschied muss beim Schreiben des Codes beachtet werden.
- Der
- Ereigniszeitgeber werden im Echtzeitmodus nicht unterstützt.
- Im Echtzeitmodus werden Timer je nach Ankunft der Daten verzögert ausgelöst.
- Wenn ein Timer für 10:00:00 uhr geplant ist, aber keine Daten eingehen, wird der Timer nicht sofort ausgelöst.
- Wenn Daten um 10:00:10 uhr eingehen, wird der Timer mit einer Verzögerung von 10 Sekunden ausgelöst.
- Wenn keine Daten eingehen und der lang andauernde Batchprozess beendet wird, wird der Timer unmittelbar ausgelöst, bevor der Batchprozess beendet wird.
Python UDFs im Echtzeitmodus
Databricks unterstützt die meisten benutzerdefinierten Python-Funktionen (UDFs) im Echtzeitmodus:
| UDF-Typ | Supported |
|---|---|
| Stateless UDF | |
| Python scalar UDF (link) | Yes |
| Pfeilskaer-UDF | Yes |
| Pandas scalar UDF (Link) | Yes |
Pfeilfunktion (mapInArrow) |
Yes |
| Pandas-Funktion (Link) | Yes |
| Stateful Grouping UDF (UDAF) | |
transformWithState (nur Row Schnittstelle) |
Yes |
| applyInPandasWithState | No |
| Nicht zustandsbehaftete Gruppierung UDF (UDAF) | |
| apply | No |
| applyInArrow | No |
| applyInPandas | No |
| Tabellenfunktion | |
| UDTF (Link) | No |
| UC UDF | No |
Es gibt mehrere Punkte, die Sie bei der Verwendung von Python UDFs im Echtzeitmodus berücksichtigen sollten:
- Um die Latenz zu minimieren, konfigurieren Sie die Pfeilbatchgröße (
spark.sql.execution.arrow.maxRecordsPerBatch) auf 1.- Kompromiss: Diese Konfiguration optimiert die Latenz auf Kosten des Durchsatzes. Für die meisten Workloads wird diese Einstellung empfohlen.
- Erhöhen Sie die Batchgröße nur, wenn ein höherer Durchsatz erforderlich ist, um das Eingabevolume aufzunehmen und die potenzielle Latenzsteigerung zu akzeptieren.
- Pandas UDFs und Funktionen arbeiten nicht gut mit einer Arrow-Batchgröße von 1.
- Wenn Sie Pandas UDFs oder Funktionen verwenden, legen Sie die Arrow-Batchgröße auf einen höheren Wert fest (z. B. 100 oder höher).
- Beachten Sie, dass dies eine höhere Latenz bedeutet. Databricks empfiehlt nach Möglichkeit die Verwendung von Pfeil-UDF oder -Funktionen.
- Aufgrund des Leistungsproblems mit Pandas wird transformWithState nur mit der
RowSchnittstelle unterstützt.
Limitations
Quellbeschränkungen
Für Kinesis unterstützt der Echtzeitmodus das Abfrageverfahren nicht. Darüber hinaus können sich häufige Neupartitionen negativ auf die Latenz auswirken.
Unionsbeschränkungen
Der Union-Betreiber hat einige Einschränkungen:
- Der Echtzeitmodus unterstützt keine Selbstvereinigung:
- Kafka: Sie können nicht dasselbe Quelldatenframe-Objekt verwenden und daraus abgeleitete Datenframes zusammenführen. Problemumgehung: Verwenden Sie unterschiedliche DataFrames, die aus derselben Quelle gelesen werden.
- Kinesis: Datenframes, die von derselben Kinesis-Quelle abgeleitet sind, können nicht mit derselben Konfiguration verbunden werden. Problemumgehung: Neben der Verwendung verschiedener DataFrames können Sie jedem DataFrame eine andere Option "consumerName" zuweisen.
- Der Echtzeitmodus unterstützt keine zustandsbehafteten Operatoren (z. B.
aggregate,deduplicate,transformWithState), die vor der Union definiert sind. - Der Echtzeitmodus unterstützt keine Union mit Batchquellen.
MapPartitions-Einschränkung
mapPartitions in Scala und ähnlichen Python-APIs (mapInPandas, mapInArrow) verarbeiten einen Iterator der gesamten Eingabepartition und generieren einen Iterator der gesamten Ausgabe mit beliebiger Zuordnung zwischen Eingabe und Ausgabe. Diese APIs können Leistungsprobleme im Streaming-Real-Time Modus verursachen, indem die gesamte Ausgabe blockiert wird, wodurch die Latenz erhöht wird. Die Semantik dieser APIs unterstützt die Wasserzeichenpropagierung nicht gut.
Verwenden Sie skalare UDFs in Kombination mit transformieren komplexer Datentypen oder filter, um ähnliche Funktionalität zu erzielen.
Nächste Schritte
Nachdem Sie nun wissen, was der Echtzeitmodus ist und wie Sie ihn konfigurieren können, erkunden Sie diese Ressourcen, um mit der Implementierung von Echtzeitstreaminganwendungen zu beginnen:
- Erste Schritte mit dem Echtzeitmodus – Befolgen Sie schrittweise Anleitungen zum Konfigurieren der Berechnung und Ausführung Ihrer ersten Echtzeitstreamingabfrage.
- Codebeispiele für den Echtzeitmodus – Erkunden Sie funktionierende Beispiele, einschließlich Kafka-Quellen und Senken, zustandsbehafteter Abfragen, Aggregationen und benutzerdefinierter Senken.
- Konzepte für strukturiertes Streaming – Lernen Sie die grundlegenden Konzepte des strukturierten Streamings auf Databricks kennen.