Asynchrone voortgangstracering

Asynchrone voortgangstracering vermindert de latentie voor Structured Streaming-pijplijnen door query's in staat te stellen de voortgang van controlepunten asynchroon bij te werken en gegevens in elke microbatch te verwerken.

Tijdens het verwerken van queries houdt Structured Streaming offsets vast en beheert deze om de voortgang van queries in offsetLog en commitLog in elke microbatch te meten. Zonder asynchrone voortgangstracering hebben offsetbeheerbewerkingen rechtstreeks invloed op de verwerkingslatentie, omdat gegevensverwerking pas kan worden voortgezet als ze zijn voltooid.

Asynchrone voortgangsregistratie

Notitie

Asynchrone voortgangstracering is niet compatibel met Trigger.once of Trigger.availableNow triggers. Indien ingeschakeld mislukken gestructureerde streaming-query's met Trigger.once of Trigger.availableNow.

Configuratieopties

Optie Verstek Beschrijving
asyncProgressTrackingEnabled false Of u asynchrone voortgangstracering wilt inschakelen.
asyncProgressTrackingCheckpointIntervalMs 1000 Het interval in milliseconden tussen schrijfbewerkingen voor offsets en voltooiingscommits.

Asynchrone voortgangstracering inschakelen

Als u asynchrone voortgangstracering wilt inschakelen, stelt u asyncProgressTrackingEnabled in op true.

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Doorvoer verbeteren met controlepuntfrequentie

De standaardfrequentie van het controlepunt is 1000 milliseconden en biedt een goede doorvoer voor de meeste query's. Wanneer offsetbeheerbewerkingen sneller plaatsvinden dan asynchrone voortgangstracering deze kan verwerken, ontstaat er een achterstand van offsetbeheerbewerkingen. Om te voorkomen dat de achterstand verder groeit, kan het bijhouden van asynchrone voortgang de verwerking van gegevens blokkeren of vertragen, waardoor de verwachte voordelen van lage latentie mogelijk worden aangetast.

In dit scenario raadt Databricks u aan om het controlepuntinterval te verhogen:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Notitie

De hersteltijd voor fouten neemt toe met de intervaltijd van het controlepunt. In geval van een fout moet een pijplijn alle gegevens opnieuw verwerken sinds het vorige geslaagde controlepunt. Voordat u deze wijziging in productie aanbrengt, moet u rekening houden met de afweging tussen een lagere latentie tijdens de reguliere verwerking in vergelijking met de hersteltijd in het geval van een storing.

Asynchrone voortgangstracering uitschakelen

Wanneer asynchrone voortgangstracering is ingeschakeld, garandeert de stream geen controlepuntvoortgang voor elke batch. U moet de voortgang bij het controlepunt vastleggen voordat u deze functie kunt uitschakelen.

Voer de volgende stappen uit om uit te schakelen:

  1. Ten minste twee microbatches verwerken met asyncProgressTrackingEnabled ingesteld op true en asyncProgressTrackingCheckpointIntervalMs ingesteld op 0:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Stop de query:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Schakel het bijhouden van asynchrone voortgang uit en start de query opnieuw op:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Als u asynchrone voortgangstracking uitschakelt zonder de bovenstaande stappen te volgen, kan de volgende fout optreden:

java.lang.IllegalStateException: batch x doesn't exist

In de stuurprogrammalogboeken ziet u mogelijk de volgende fout:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Limitations

  • Voor Kafka-sinks ondersteunt asynchrone voortgangsbewaking alleen staatloze pijplijnen.
  • Asynchrone voortgangstracering garandeert geen exacte eenmalige end-to-end verwerking, omdat offsetbereiken voor een batch kunnen veranderen bij een fout. Sommige sinks, zoals Kafka, bieden nooit precies één keer garanties.