Freigeben über


Echtzeitmodus im strukturierten Streaming

Important

Dieses Feature befindet sich in der Public Preview.

Der Echtzeitmodus ist ein Triggertyp für strukturiertes Streaming, der eine extrem niedrige Datenverarbeitung mit End-to-End-Latenz mit bis zu fünf Millisekunden ermöglicht. Verwenden Sie den Echtzeitmodus für betriebsbereite Workloads, die sofortige Reaktion auf Streamingdaten erfordern, z. B. Betrugserkennung, Echtzeitpersonalisierung und sofortige Entscheidungssysteme.

Der Echtzeitmodus ist in Databricks Runtime 16.4 LTS und höher verfügbar. Schrittweise Anleitungen zum Einrichten finden Sie unter "Erste Schritte mit dem Echtzeitmodus". Codebeispiele finden Sie in Den Beispielen für den Echtzeitmodus.

Was ist der Echtzeitmodus?

Operative und analytische Arbeitslasten

Streaming-Workloads können allgemein in analytische Workloads und betriebsbereite Workloads unterteilt werden:

  • Analytische Workloads verwenden Datenaufnahme und Transformation, in der Regel nach der Medallion-Architektur (z. B. Das Aufnehmen von Daten in die Bronze-, Silber- und Goldtabellen).
  • Betriebsarbeitslasten verbrauchen Echtzeitdaten, wenden Geschäftslogik an und lösen nachgeschaltete Aktionen oder Entscheidungen aus.

Einige Beispiele für betriebliche Arbeitslasten sind:

  • Blockieren oder Kennzeichnen einer Kreditkartentransaktion in Echtzeit, wenn eine Betrugsbewertung einen Schwellenwert überschreitet, basierend auf Faktoren wie ungewöhnlichem Standort, großer Transaktionsgröße oder schnellen Ausgabenmustern.
  • Die Übermittlung einer Werbenachricht erfolgt, wenn Clickstream-Daten zeigen, dass ein Benutzer seit fünf Minuten nach Jeans durchsucht, und es wird ein Rabatt von 25 % angeboten, wenn der Benutzer innerhalb der nächsten 15 Minuten einen Kauf tätigt.

Im Allgemeinen sind operative Arbeitslasten durch die Notwendigkeit gekennzeichnet, eine End-to-End-Latenz unterhalb einer Sekunde zu erreichen. Dies kann mit dem Echtzeitmodus in Apache Spark Structured Streaming erreicht werden.

So erzielt der Echtzeitmodus eine niedrige Latenz

Der Echtzeitmodus verbessert die Ausführungsarchitektur durch:

  • Das Ausführen langer Laufender Batches (der Standardwert ist fünf Minuten), in dem das System Daten verarbeitet, sobald sie in der Quelle verfügbar sind.
  • Planen Sie alle Abfrageschritte gleichzeitig. Dies erfordert, dass die Anzahl der verfügbaren Vorgangsplätze gleich oder größer als die Anzahl der Vorgänge aller Phasen in einem Batch ist.
  • Übertragung von Daten zwischen den Phasen, sobald sie mittels Streaming-Shuffle übermittelt werden.

Am Ende der Verarbeitung eines Batches und vor dem Beginn des nächsten Batches erfasst Structured Streaming den Fortschritt und protokolliert Metriken. Die Batch-Dauer wirkt sich auf die Prüfpunkthäufigkeit aus.

  • Längere Batches: Weniger häufiges Checkpointing, was zu längeren Wiederholungen bei Ausfällen und verzögerter Verfügbarkeit von Metriken führt.
  • Kürzere Batches: Häufigere Prüfpunkte, die sich auf die Latenz auswirken können.

Databricks empfiehlt, den Echtzeitmodus mit Ihrer Zielarbeitsauslastung zu vergleichen, um das entsprechende Triggerintervall zu finden.

Gründe für die Verwendung des Echtzeitmodus

Wählen Sie den Echtzeitmodus aus, wenn Für Ihren Anwendungsfall Folgendes erforderlich ist:

  • Sekundäre Latenz: Anwendungen, die innerhalb von Millisekunden auf Daten reagieren müssen, z. B. Betrugserkennungssysteme, die Transaktionen in Echtzeit blockieren müssen.
  • Operative Entscheidungsfindung: Systeme, die sofortige Aktionen basierend auf eingehenden Daten auslösen, z. B. Echtzeitangebote, Warnungen oder Benachrichtigungen.
  • Kontinuierliche Verarbeitung: Workloads, bei denen Daten verarbeitet werden müssen, sobald sie eintreffen, und nicht in regelmäßigen Batches.

Verwenden Sie den Mikrobatchmodus (der standardmäßige strukturierte Streamingtrigger), wenn:

  • Analytische Verarbeitung: ETL-Pipelines, Datentransformationen und Medallion-Architekturimplementierungen, bei denen Latenzanforderungen in Sekunden oder Minuten gemessen werden.
  • Kostenoptimierung: Workloads, bei denen die Latenz von Unter-Sekunden nicht erforderlich ist, da im Echtzeitmodus dedizierte Computeressourcen erforderlich sind.
  • Die Prüfpunkthäufigkeit ist wichtig: Anwendungen, die von häufigeren Prüfpunkten für eine schnellere Wiederherstellung profitieren.

Anforderungen und Konfiguration

Der Echtzeitmodus hat spezifische Anforderungen für die Berechnungseinrichtung und Abfragekonfiguration. In diesem Abschnitt werden die Voraussetzungen und Konfigurationsschritte beschrieben, die für die Verwendung des Echtzeitmodus erforderlich sind.

Voraussetzungen

Um den Echtzeitmodus zu verwenden, müssen Sie die folgenden Anforderungen erfüllen:

  • Databricks Runtime 16.4 LTS oder höher: Echtzeitmodus ist nur in DBR 16.4 LTS und höheren Versionen verfügbar.
  • Dedizierter Computer: Sie müssen einen dedizierten (früher einzelner Benutzer) Computer verwenden. Standard (früher 'Shared'), "Lakeflow Spark Declarative Pipelines" und Serverless-Cluster werden nicht unterstützt.
  • Keine automatische Skalierung: Die automatische Skalierung muss deaktiviert werden.
  • Kein Photon: Die Photonbeschleunigung wird im Echtzeitmodus nicht unterstützt.
  • Spark-Konfiguration: Sie müssen spark.databricks.streaming.realTimeMode.enabled auf true einstellen.

Rechnerkonfiguration

Konfigurieren Sie Ihre Berechnung mit den folgenden Einstellungen:

  • Setze spark.databricks.streaming.realTimeMode.enabled auf true in der Spark-Konfiguration.
  • Deaktivieren Sie die automatische Skalierung.
  • Deaktivieren Sie die Photonbeschleunigung.
  • Stellen Sie sicher, dass die Berechnung als dedizierter Cluster konfiguriert ist (keine Standard-, Lakeflow Spark Declarative Pipelines oder Serverless).

Schrittweise Anleitungen zum Erstellen und Konfigurieren von Compute für den Echtzeitmodus finden Sie unter "Erste Schritte mit dem Echtzeitmodus".

Abfragekonfiguration

Um eine Abfrage im Echtzeitmodus auszuführen, müssen Sie den Echtzeittrigger aktivieren. Echtzeittrigger werden nur im Updatemodus unterstützt.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Skalierung von Berechnungen

Sie können einen Echtzeitauftrag pro Rechenressource ausführen, wenn die Rechenressource über genügend Task-Slots verfügt.

Um im Modus mit geringer Latenz ausgeführt zu werden, muss die Gesamtanzahl der verfügbaren Aufgabenplätze größer oder gleich der Anzahl der Aufgaben in allen Abfragephasen sein.

Slot-Berechnungsbeispiele

Pipelinetyp Konfiguration Erforderliche Steckplätze
Einzelstufenzustandslos (Kafka Source + Sink) maxPartitions = 8 8 Steckplätze
Zweistufiger Zustand (Kafka-Quelle + Shuffle) maxPartitions = 8, shuffle partitions = 20 28 Steckplätze (8 + 20)
Dreistufige (Kafka-Quelle + *shuffle* + *repartition*) maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 48 Steckplätze (8 + 20 + 20)

Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.

Wichtige Überlegungen

Berücksichtigen Sie beim Konfigurieren der Berechnung Folgendes:

  • Im Gegensatz zum Mikrobatchmodus können Echtzeitaufgaben im Leerlauf bleiben, während sie auf Daten warten. Daher ist die richtige Größenanpassung unerlässlich, um verschwendete Ressourcen zu vermeiden.
  • Ziel einer Zielauslastungsstufe (z. B. 50%) durch Optimierung:
    • maxPartitions (für Kafka)
    • spark.sql.shuffle.partitions (für Shuffle-Phasen)
  • Databricks empfiehlt die Einstellung maxPartitions , sodass jede Aufgabe mehrere Kafka-Partitionen verarbeitet, um den Aufwand zu reduzieren.
  • Passen Sie die Aufgabenslots pro Worker für einfache einstufige Aufträge an die Workload an.
  • Experimentieren Sie bei Shuffle-intensiven Aufträgen, um die minimale Anzahl an Shufflepartitionen zu ermitteln, mit der Rückstände vermieden werden können. Nehmen Sie entsprechende Anpassungen vor. Das Computersystem plant den Auftrag nicht, wenn es nicht über genügend Kapazitäten verfügt.

Note

Von Databricks Runtime 16.4 LTS und höher verwenden alle Echtzeit-Pipelines Checkpoint v2, was einen nahtlosen Wechsel zwischen Echtzeit- und Mikrobatch-Modi ermöglicht.

Optimierungstechniken

Technique Standardmäßig aktiviert
Asynchrone Fortschrittsverfolgung: Verschiebt das Schreiben in das Offsetlog und Commit-Log in einen asynchronen Thread und reduziert damit die Zeit zwischen zwei Mikrobatches. Dies kann dazu beitragen, die Latenz von zustandslosen Streamingabfragen zu verringern. No
Asynchrone Zustandsprüfpunkterstellung: Hilft, die Latenz von zustandsbehafteten Streamingabfragen zu reduzieren, indem sie mit der Verarbeitung des nächsten Mikro-Batchs beginnt, sobald die Berechnung des vorherigen Mikro-Batchs abgeschlossen ist, ohne auf Zustandsprüfpunkte zu warten. No

Überwachung und Beobachtbarkeit

Die Messung der Abfrageleistung ist für Echtzeitworkloads unerlässlich. Im Echtzeitmodus spiegeln herkömmliche Batchdauermetriken keine tatsächliche Latenz wider, daher benötigen Sie alternative Ansätze.

End-to-End-Latenz ist arbeitslastspezifisch und kann manchmal nur mit Geschäftslogik genau gemessen werden. Wenn z. B. der Quellzeitstempel in Kafka ausgegeben wird, können Sie die Latenz als Differenz zwischen dem Ausgabezeitstempel von Kafka und dem Quellzeitstempel berechnen.

Sie können die End-to-End-Latenz auch mithilfe der unten beschriebenen integrierten Metriken und APIs schätzen.

Integrierte Metriken mit StreamingQueryProgress

Die folgenden Metriken sind im StreamingQueryProgress Ereignis enthalten, das automatisch in den Treiberprotokollen protokolliert wird. Sie können auch über die StreamingQueryListener-onQueryProgress()-Rückruffunktion darauf zugreifen. QueryProgressEvent.json() oder toString() zusätzliche Echtzeitmodusmetriken enthalten.

  1. Verarbeitungslatenz (processingLatencyMs). Die Zeit, die im Echtzeitmodus zwischen dem Lesen eines Datensatzes durch die Abfrage und dem Schreiben der Abfrage in die nächste Phase oder weiter unten im Prozess vergeht. Bei Einzelstufenabfragen misst dies die gleiche Dauer wie die E2E-Latenz. Das System meldet diese Metrik pro Vorgang.
  2. Quellwarteschlangenlatenz (sourceQueuingLatencyMs). Der Zeitraum, der zwischen dem Schreiben eines Datensatzes durch das System in einen Nachrichtenbus, z. B. die Anfügezeit des Protokolls in Kafka, und dem ersten Lesen des Datensatzes durch die Echtzeitmodusabfrage verstrichen ist. Das System meldet diese Metrik pro Vorgang.
  3. E2E-Latenz (e2eLatencyMs). Die Zeit zwischen dem Schreiben des Datensatzes in einen Nachrichtenbus und dem Schreiben des Datensatzes durch die Echtzeitanfrage weiter unten. Das System aggregiert diese Metrik pro Batch für alle Datensätze, die von allen Vorgängen verarbeitet werden.

Beispiel:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Benutzerdefinierte Latenzmessung mit der Observe-API

Die Observe-API hilft beim Messen der Latenz, ohne einen anderen Auftrag zu starten. Wenn Sie über einen Quellzeitstempel verfügen, der die Ankunftszeit der Quelldaten annähert, können Sie die Latenz jedes Batches mithilfe der Observe-API schätzen. Übergeben Sie den Zeitstempel, bevor Sie die Spüle erreichen:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In diesem Beispiel wird ein aktueller Zeitstempel vor der Ausgabe des Eintrags aufgezeichnet, und die Latenz wird geschätzt, indem die Differenz zwischen diesem Zeitstempel und dem Quellzeitstempel des Datensatzes berechnet wird. Die Ergebnisse werden in Fortschrittsberichten aufgenommen und den Zuhörern zur Verfügung gestellt. Hier ist eine Beispielausgabe:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Featureunterstützung und -einschränkungen

In diesem Abschnitt werden die unterstützten Features und aktuellen Einschränkungen des Echtzeitmodus beschrieben, einschließlich kompatibler Umgebungen, Sprachen, Quellen, Senken, Operatoren und besonderer Überlegungen zu bestimmten Features.

Unterstützte Umgebungen, Sprachen und Modi

Computetyp Supported
Dedizierter (früher: einzelner Benutzer) Yes
Standard (früher: geteilt) No
Lakeflow Spark Deklarative Pipelines Klassisch No
Lakeflow Spark Deklarative Pipelines Serverlos No
Serverless No

Unterstützte Sprachen:

Language Supported
Scala Yes
Java Yes
Python Yes

Unterstützte Ausführungsmodi:

Ausführungsmodus Supported
Aktualisierungsmodus Yes
Append mode No
Vollständiger Modus No

Unterstützte Quellen und Empfänger

Quellen:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Event Hubs (mit Kafka Connector) Yes
Kinesis Ja (nur EFO-Modus)
Google Pub/Sub (Nachrichtendienst) No
Apache Pulsar No

Waschbecken:

Sinks Supported
Apache Kafka Yes
Event Hubs (mit Kafka Connector) Yes
Kinesis No
Google Pub/Sub (Nachrichtendienst) No
Apache Pulsar No
Beliebige Senken (mit forEachWriter) Yes

Unterstützte Operatoren

Operators Supported
Zustandslose Operationen
Selection Yes
Projection Yes
UDFs
Skala UDF Ja (mit einigen Einschränkungen)
Python-Benutzerdefinierte Funktion (UDF) Ja (mit einigen Einschränkungen)
Zusammenfassung
sum Yes
count Yes
max Yes
min Yes
avg Yes
Aggregationsfunktionen Yes
Windowing
Tumbling Yes
Sliding Yes
Session No
Deduplizierung
dropDuplicates Ja (der Zustand ist ungebunden)
dropDuplicatesWithinWatermark No
Stream–Tabellenverknüpfung
Übertragungstabelle (sollte klein sein) Yes
Stream – Stream-Join No
(flach)MapGroupsWithState No
transformWithState Ja (mit einigen Unterschieden)
union Ja (mit einigen Einschränkungen)
forEach Yes
forEachBatch No
mapPartitions Nein (siehe Einschränkung)

Besondere Überlegungen

Einige Operatoren und Features weisen spezifische Überlegungen oder Unterschiede auf, wenn sie im Echtzeitmodus verwendet werden.

transformWithState im Echtzeitmodus

Zum Erstellen von benutzerdefinierten zustandsbehafteten Anwendungen unterstützt Databricks transformWithState, eine API in Apache Spark Structured Streaming. Weitere Informationen zu API- und Codeausschnitten finden Sie unter Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung .

Es gibt jedoch einige Unterschiede zwischen dem Verhalten der API im Echtzeitmodus und herkömmlichen Streamingabfragen, die die Mikrobatcharchitektur nutzen.

  • Im Echtzeitmodus wird die handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) Methode für jede Zeile aufgerufen.
    • Der inputRows Iterator gibt einen einzelnen Wert zurück. Der Mikrobatchmodus ruft ihn einmal für jeden Schlüssel auf, und der inputRows Iterator gibt alle Werte für einen Schlüssel im Mikrobatch zurück.
    • Dieser Unterschied muss beim Schreiben des Codes beachtet werden.
  • Ereigniszeitgeber werden im Echtzeitmodus nicht unterstützt.
  • Im Echtzeitmodus werden Timer je nach Ankunft der Daten verzögert ausgelöst.
    • Wenn ein Timer für 10:00:00 uhr geplant ist, aber keine Daten eingehen, wird der Timer nicht sofort ausgelöst.
    • Wenn Daten um 10:00:10 uhr eingehen, wird der Timer mit einer Verzögerung von 10 Sekunden ausgelöst.
    • Wenn keine Daten eingehen und der lang andauernde Batchprozess beendet wird, wird der Timer unmittelbar ausgelöst, bevor der Batchprozess beendet wird.

Python UDFs im Echtzeitmodus

Databricks unterstützt die meisten benutzerdefinierten Python-Funktionen (UDFs) im Echtzeitmodus:

UDF-Typ Supported
Stateless UDF
Python scalar UDF (link) Yes
Pfeilskaer-UDF Yes
Pandas scalar UDF (Link) Yes
Pfeilfunktion (mapInArrow) Yes
Pandas-Funktion (Link) Yes
Stateful Grouping UDF (UDAF)
transformWithState (nur Row Schnittstelle) Yes
applyInPandasWithState No
Nicht zustandsbehaftete Gruppierung UDF (UDAF)
apply No
applyInArrow No
applyInPandas No
Tabellenfunktion
UDTF (Link) No
UC UDF No

Es gibt mehrere Punkte, die Sie bei der Verwendung von Python UDFs im Echtzeitmodus berücksichtigen sollten:

  • Um die Latenz zu minimieren, konfigurieren Sie die Pfeilbatchgröße (spark.sql.execution.arrow.maxRecordsPerBatch) auf 1.
    • Kompromiss: Diese Konfiguration optimiert die Latenz auf Kosten des Durchsatzes. Für die meisten Workloads wird diese Einstellung empfohlen.
    • Erhöhen Sie die Batchgröße nur, wenn ein höherer Durchsatz erforderlich ist, um das Eingabevolume aufzunehmen und die potenzielle Latenzsteigerung zu akzeptieren.
  • Pandas UDFs und Funktionen arbeiten nicht gut mit einer Arrow-Batchgröße von 1.
    • Wenn Sie Pandas UDFs oder Funktionen verwenden, legen Sie die Arrow-Batchgröße auf einen höheren Wert fest (z. B. 100 oder höher).
    • Beachten Sie, dass dies eine höhere Latenz bedeutet. Databricks empfiehlt nach Möglichkeit die Verwendung von Pfeil-UDF oder -Funktionen.
  • Aufgrund des Leistungsproblems mit Pandas wird transformWithState nur mit der Row Schnittstelle unterstützt.

Limitations

Quellbeschränkungen

Für Kinesis unterstützt der Echtzeitmodus das Abfrageverfahren nicht. Darüber hinaus können sich häufige Neupartitionen negativ auf die Latenz auswirken.

Unionsbeschränkungen

Der Union-Betreiber hat einige Einschränkungen:

  • Der Echtzeitmodus unterstützt keine Selbstvereinigung:
    • Kafka: Sie können nicht dasselbe Quelldatenframe-Objekt verwenden und daraus abgeleitete Datenframes zusammenführen. Problemumgehung: Verwenden Sie unterschiedliche DataFrames, die aus derselben Quelle gelesen werden.
    • Kinesis: Datenframes, die von derselben Kinesis-Quelle abgeleitet sind, können nicht mit derselben Konfiguration verbunden werden. Problemumgehung: Neben der Verwendung verschiedener DataFrames können Sie jedem DataFrame eine andere Option "consumerName" zuweisen.
  • Der Echtzeitmodus unterstützt keine zustandsbehafteten Operatoren (z. B. aggregate, deduplicate, transformWithState), die vor der Union definiert sind.
  • Der Echtzeitmodus unterstützt keine Union mit Batchquellen.

MapPartitions-Einschränkung

mapPartitions in Scala und ähnlichen Python-APIs (mapInPandas, mapInArrow) verarbeiten einen Iterator der gesamten Eingabepartition und generieren einen Iterator der gesamten Ausgabe mit beliebiger Zuordnung zwischen Eingabe und Ausgabe. Diese APIs können Leistungsprobleme im Streaming-Real-Time Modus verursachen, indem die gesamte Ausgabe blockiert wird, wodurch die Latenz erhöht wird. Die Semantik dieser APIs unterstützt die Wasserzeichenpropagierung nicht gut.

Verwenden Sie skalare UDFs in Kombination mit transformieren komplexer Datentypen oder filter, um ähnliche Funktionalität zu erzielen.

Nächste Schritte

Nachdem Sie nun wissen, was der Echtzeitmodus ist und wie Sie ihn konfigurieren können, erkunden Sie diese Ressourcen, um mit der Implementierung von Echtzeitstreaminganwendungen zu beginnen: