Freigeben über


Auswählen eines Ausgabemodus für strukturiertes Streaming

Dieser Artikel enthält Informationen zur Wahl eines Ausgabemodus für zustandsbehaftetes Streaming. Ein Ausgabemodus muss nur für zustandsbehaftete Streams mit Aggregationen konfiguriert werden.

Verknüpfungen unterstützen nur den Modus für angefügte Ausgaben, und der Ausgabemodus beeinflusst die Deduplizierung nicht. Die willkürlichen zustandsbehafteten Operatoren mapGroupsWithState und flatMapGroupsWithState geben Datensätze mithilfe ihrer eigenen benutzerdefinierten Logik aus, sodass sich der Ausgabemodus des Datenstroms nicht auf ihr Verhalten auswirkt.

Beim zustandslosen Streaming verhalten sich alle Ausgabemodi gleich.

Um den Ausgabemodus korrekt konfigurieren zu können, müssen Sie mit zustandsbehaftetem Streaming sowie mit Wasserzeichen und Triggern vertraut sein. Weitere Informationen finden Sie in folgenden Artikeln:

Was ist der Ausgabemodus?

Der Ausgabemodus einer Strukturierten Streaming-Abfrage bestimmt, welche Datensätze die Abfrageoperatoren während des jeweiligen Triggers ausgeben. Drei Arten von Datensätzen können ausgegeben werden:

  • Datensätze, die bei der weiteren Verarbeitung nicht geändert werden.
  • Datensätze, die seit dem letzten Trigger geändert wurden.
  • Alle Datensätze in der Status-Tabelle.

Für zustandsbehaftete Operatoren ist es wichtig zu wissen, welche Arten von Datensätzen ausgegeben werden sollen, da sich eine bestimmte Zeile, die von einem zustandsbehafteten Operator erzeugt wird, von Trigger zu Trigger ändern kann. Wenn beispielsweise ein Streamingaggregationsoperator mehr Zeilen für ein bestimmtes Fenster empfängt, können sich die Aggregationswerte dieses Fensters über Trigger hinweg ändern.

Bei zustandslosen Operatoren wirken sich unterschiedliche Datensatztypen nicht auf das Verhalten des Operators aus. Bei den Datensätzen, die ein zustandsloser Operator im Rahmen eines Triggers ausgibt, handelt es sich immer um die Quelldatensätze, die im Rahmen dieses Triggers verarbeitet werden.

Verfügbare Ausgabemodi

Es gibt drei Ausgabemodi, die einem Operator mitteilen, welche Datensätze im Rahmen eines bestimmten Triggers ausgegeben werden sollen:

Ausgabemodus Beschreibung
Anfügemodus (Standardeinstellung) Standardmäßig werden Streamingabfragen im Anfügemodus ausgeführt. In diesem Modus geben Operatoren nur Zeilen aus, die sich in zukünftigen Triggern nicht ändern. Zustandsbehaftete Operatoren verwenden das Wasserzeichen, um zu bestimmen, wann dies der Fall ist.
Aktualisierungsmodus Im Aktualisierungsmodus geben Operatoren alle Zeilen aus, die sich im Rahmen des Triggers geändert haben, auch wenn sich der ausgegebene Datensatz womöglich bei einem späteren Trigger ändert.
Vollständiger Modus Der vollständige Modus funktioniert nur mit Streamingaggregationen. Im vollständigen Modus werden alle vom Operator jemals erzeugten Zeilen ausgegeben.

Produktionsüberlegungen

Bei vielen zustandsbehafteten Streamingvorgängen müssen Sie zwischen dem Anfügemodus und dem Aktualisierungsmodus wählen. Die folgenden Abschnitte enthalten Überlegungen, an denen Sie sich bei Ihrer Entscheidung orientieren können.

Hinweis

Der vollständige Modus ist für bestimmte Fälle geeignet, schneidet aber unter Umständen nicht so gut ab, wenn die Daten skaliert werden. Databricks empfiehlt die Verwendung materialisierter Sichten, um semantische Garantien für den vollständigen Modus mit inkrementeller Verarbeitung für viele zustandsbehaftete Vorgänge zu erhalten. Siehe Materialisierte Ansichten.

Anwendungssemantik

Die Anwendungssemantik beschreibt, wie nachgelagerte Anwendungen die Streamingdaten verwenden.

Wenn nachgelagerte Dienste für jeden nachgelagerten Schreibvorgang eine einzelne Aktion ausführen müssen, empfiehlt sich in den meisten Fällen die Verwendung des Anfügemodus. Wenn Sie also beispielsweise über einen nachgelagerten Benachrichtigungsdienst verfügen, der Benachrichtigungen für jeden neuen Datensatz sendet, der in die Senke geschrieben wird, sorgt der Anfügemodus dafür, dass jeder Datensatz nur einmal geschrieben wird. Der Aktualisierungsmodus schreibt den Datensatz jedes Mal, wenn sich die Statusinformationen ändern, was zu zahlreichen Aktualisierungen führen würde.

Wenn nachgelagerte Dienste aktuelle Ergebnisse benötigen, sorgt der Aktualisierungsmodus dafür, dass Ihre Datenbank weiterhin so aktuell wie möglich bleibt. Beispiele wären etwa ein Machine Learning-Modell, das Features in Echtzeit liest, oder ein Analysedashboard, das Echtzeitaggregate nachverfolgt.

Operator- und Senkenkompatibilität

Strukturiertes Streaming unterstützt nicht alle Vorgänge, die in Apache Spark verfügbar sind, und einige Streamingvorgänge werden nicht in allen Ausgabemodi unterstützt. Weitere Informationen zu Operatorbeschränkungen finden Sie in der OSS-Streamingdokumentation.

Nicht alle Senken unterstützen alle Ausgabemodi. Kafka unterstützt alle Ausgabemodi. Delta Lake, der alle verwalteten Tabellen des Unity-Katalogs unterstützt, unterstützt Anfüge- und vollständige Modi, aber nicht den Aktualisierungsmodus. Verhalten, das dem Aktualisierungsmodus mit Delta Lake-Senken ähnlich ist, finden Sie unter Zusammenführen im Streaming.

Weitere Informationen zur Sink-Kompatibilität finden Sie in der OSS-Streaming-Dokumentation.

Wartezeit und Kosten

Der Ausgabemodus wirkt sich darauf aus, wie viel Zeit bis zum Schreiben eines Datensatzes vergehen muss, und die Frequenz und Menge der geschriebenen Daten kann sich auf die Kosten im Zusammenhang mit Streamingpipelines auswirken.

Der Anfügemodus erzwingt, dass zustandsbehaftete Operatoren Ergebnisse erst ausgeben, wenn zustandsbehaftete Ergebnisse fertig gestellt wurden, was mindestens so lange dauert wie Ihre Wasserzeichenverzögerung. Bei einer Wasserzeichenverzögerung von 1 hour im Anfügemodus der Ausgabe bedeutet es, dass Ihre Datensätze mindestens eine Verzögerung von einer Stunde haben, bevor sie nachgelagert weitergeleitet werden.

Der Aktualisierungsmodus resultiert in einem einzelnen Schreibvorgang pro Trigger und Aggregatwert. Falls bei Ihrer Senke Gebühren pro Schreibvorgang und Datensatz anfallen, ist es unter Umständen teuer, wenn Datensätze mehrmals aktualisiert werden, bis das Ende der Wasserzeichenverzögerung erreicht ist.

Konfigurationsbeispiele

Die folgenden Codebeispiele zeigen das Konfigurieren des Ausgabemodus für das Streaming von Aktualisierungen an Unity Catalog-Tabellen:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Weitere Informationen finden Sie in der OSS-Dokumentation unter PySpark DataStreamWriter.outputMode oder Scala DataStreamWriter.outputMode.

Beispiel für zustandsbehaftetes Streaming und Ausgabemodi

Das folgende Beispiel soll Ihnen helfen, nachzuvollziehen, wie der Ausgabemodus und die Wasserzeichen beim zustandsbehafteten Streaming interagieren.

Stellen Sie sich eine Streamingaggregation vor, die den Gesamtumsatz berechnet, der pro Stunde in einem Geschäft erzielt wurde, und dabei eine Wasserzeichenverzögerung von 15 Minuten verwendet. Im Rahmen des ersten Mikrobatch werden folgende Datensätze verarbeitet:

  • 15 USD um 14:40 Uhr
  • 10 USD um 14:30 Uhr
  • 30 USD um 15:10 Uhr

Zu diesem Zeitpunkt beträgt das Wasserzeichen des Motors 2:55 Uhr, da es 15 Minuten (die Verzögerung) von der maximalen Zeit (15:10 Uhr) subtrahiert. Der Zustand des Streaming-Aggregationsoperators enthält Folgendes:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 30 USD

Die folgende Tabelle zeigt, was im jeweiligen Ausgabemodus passieren würde:

Ausgabemodus Ergebnis und Grund
Anfügen Der Streamingaggregationsoperator gibt nichts an nachgelagerte Komponenten aus. Das liegt daran, dass sich beide Fenster noch ändern können, wenn neue Werte mit einem nachfolgenden Trigger hinzukommen: Das Wasserzeichen von 14:55 Uhr bedeutet, dass möglicherweise noch Datensätze nach 14:55 Uhr eingehen, und diese Datensätze können entweder in das [2pm, 3pm]-Fenster oder in das [3pm, 4pm]-Fenster fallen.
Aktualisieren Der Operator gibt beide Datensätze aus, da beide Datensätze Aktualisierungen erhalten haben.
Abschließen Der Operator gibt alle Datensätze aus.

Angenommen, der Datenstrom empfängt noch einen weiteren Datensatz:

  • 20 USD um 15:20 Uhr

Das Wasserzeichen wird auf 15:05 Uhr aktualisiert, weil die Software 15 Minuten von 15:20 Uhr abzieht. Der Streaming-Aggregationsoperator hat zu diesem Zeitpunkt Folgendes in seinem Zustand:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 50 USD

Die folgende Tabelle zeigt, was im jeweiligen Ausgabemodus passieren würde:

Ausgabemodus Ergebnis und Grund
Anfügen Der Streamingaggregationsoperator erkennt, dass das Wasserzeichen von 15:05 Uhr größer als das Ende des [2pm, 3pm]-Fensters ist. Aufgrund der Definition des Wasserzeichens kann sich dieses Fenster nicht mehr ändern. Daher wird das [2pm, 3pm]-Fenster ausgegeben.
Aktualisieren Der Streamingaggregationsoperator gibt das [3pm, 4pm]-Fenster aus, da sich der Zustandswert von 30 USD in 50 USD geändert hat.
Abschließen Der Operator gibt alle Datensätze aus.

Die folgende Zusammenfassung veranschaulicht, wie sich zustandsbehaftete Operatoren im jeweiligen Anfügemodus verhalten:

  • Nach der Wasserzeichenverzögerung werden im Anfügemodus Datensätze einmal geschrieben.
  • Im Aktualisierungsmodus werden Datensätze geschrieben, die sich seit dem letzten Auslöser geändert haben.
  • Im vollständigen Modus werden alle Datensätze geschrieben, die vom zustandsbehafteten Operator erstellt wurden.