Asynchrone Statusverfolgung

Die asynchrone Statusnachverfolgung reduziert die Latenz für strukturierte Streaming-Pipelines, indem Abfragen den Fortschritt des Prüfpunkts asynchron aktualisieren und Daten in jedem Mikrobatch verarbeiten können.

Während der Abfrageverarbeitung werden bei Structured Streaming Offsets beibehalten und verwaltet, um den Abfragefortschritt in offsetLog und commitLog in jedem Mikrobatch zu messen. Ohne asynchrone Statusnachverfolgung wirken sich Offsetverwaltungsvorgänge direkt auf die Verarbeitungslatenz aus, da die Datenverarbeitung erst fortgesetzt werden kann, wenn sie abgeschlossen sind.

Asynchrone Statusverfolgung

Hinweis

Die asynchrone Statusnachverfolgung ist nicht mit Trigger.once oder Trigger.availableNow Triggern kompatibel. Wenn diese Option aktiviert ist, schlagen strukturierte Streaming-Abfragen mit Trigger.once oder Trigger.availableNow fehl.

Konfigurationsoptionen

Option Standard Description
asyncProgressTrackingEnabled false Gibt an, ob die asynchrone Statusnachverfolgung aktiviert werden soll.
asyncProgressTrackingCheckpointIntervalMs 1000 Das Intervall in Millisekunden zwischen Schreibvorgängen für Offsets und Abschluss-Commits.

Asynchrone Statusverfolgung aktivieren

Um die asynchrone Statusnachverfolgung zu aktivieren, legen Sie asyncProgressTrackingEnabled auf folgendes fest:true

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Verbesserung des Durchsatzes durch Prüfpunkthäufigkeit

Die Standardprüfpunkthäufigkeit von 1000 Millisekunden weist für die meisten Abfragen einen guten Durchsatz auf. Wenn Offsetverwaltungsvorgänge schneller auftreten als die asynchrone Statusnachverfolgung sie verarbeiten kann, wird ein Backlog von Offsetverwaltungsvorgängen erstellt. Um zu verhindern, dass der Backlog weiter wächst, kann die asynchrone Fortschrittsnachverfolgung die Datenverarbeitung blockieren oder verlangsamen und die erwarteten Latenzvorteile möglicherweise erodieren.

In diesem Szenario empfiehlt Databricks, das Prüfpunktintervall zu erhöhen:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Hinweis

Die Fehlerwiederherstellungszeit erhöht sich mit der Zeit des Prüfpunktintervalls. Im Falle eines Ausfalls muss eine Pipeline alle Daten seit dem vorherigen erfolgreichen Prüfpunkt erneut verarbeiten. Bevor Sie diese Änderung in der Produktion vornehmen, sollten Sie den Kompromiss zwischen geringerer Latenz während der normalen Verarbeitung im Vergleich zur Wiederherstellungszeit im Falle eines Ausfalls berücksichtigen.

Deaktivieren der asynchronen Statusnachverfolgung

Wenn die asynchrone Statusnachverfolgung aktiviert ist, garantiert der Datenstrom nicht den Prüfpunktstatus für jeden Batch. Sie müssen den Fortschritt sichern, bevor Sie diese Funktion deaktivieren können.

Führen Sie zum Deaktivieren die folgenden Schritte aus:

  1. Verarbeiten Sie mindestens zwei Mikrobatches, wobei asyncProgressTrackingEnabled auf true gesetzt und asyncProgressTrackingCheckpointIntervalMs auf 0 gesetzt ist.

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Beenden Sie die Abfrage:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Deaktivieren Sie die asynchrone Statusnachverfolgung, und starten Sie die Abfrage neu:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Wenn Sie die asynchrone Statusnachverfolgung deaktivieren, ohne die vorstehenden Schritte auszuführen, tritt möglicherweise der folgende Fehler auf:

java.lang.IllegalStateException: batch x doesn't exist

In den Treiberprotokollen wird möglicherweise der folgende Fehler angezeigt:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Einschränkungen

  • Bei Kafka-Senken unterstützt die asynchrone Fortschrittsverfolgung nur zustandslose Pipelines.
  • Die asynchrone Statusnachverfolgung garantiert nicht eine genau einmalige End-to-End-Verarbeitung, da Offsetbereiche einer Charge bei Fehlern geändert werden können. Einige Senken, wie z. B. Kafka, bieten nie genau einmal Garantien.