Konfigurera realtidsläge

Den här sidan beskriver de förutsättningar och konfigurationer som krävs för att köra frågor i realtidsläge i Strukturerad direktuppspelning. För en steg-för-steg-självstudie, se Självstudie: Kör en arbetsbelastning för realtidsströmning. Konceptuell information om realtidsläge finns i Realtidsläge i Strukturerad direktuppspelning.

Förutsättningar

Om du vill använda realtidsläge måste du konfigurera beräkningen så att den uppfyller följande krav:

  • Använd klassisk beräkning. Dedikerade och standardåtkomstlägen stöds. Standardåtkomstläge stöds endast för Python. Lakeflow Spark Deklarativa pipelines och serverlösa kluster stöds inte.
  • Använd Databricks Runtime 16.4 LTS och senare.
  • Inaktivera autoskalning.
  • Stäng av Foton.
  • Ställ in spark.databricks.streaming.realTimeMode.enabledtrue.
  • Inaktivera spotinstanser för att undvika avbrott.

För svarstidskänsliga arbetsbelastningar med UDF:er rekommenderar Databricks att du använder dedikerat åtkomstläge. Se Tabellfunktioner.

Anvisningar om hur du skapar och konfigurerar klassisk beräkning finns i Referens för beräkningskonfiguration.

Frågekonfiguration

Om du vill köra en fråga i realtidsläge måste du aktivera realtidsutlösaren. Realtidsutlösare stöds endast i uppdateringsläge.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Beräkningsstorlek

Du kan köra ett realtidsjobb per beräkningsresurs om resursen har tillräckligt med uppgiftsplatser.

Om du vill köra i läge med låg latens måste det totala antalet tillgängliga aktivitetsfack vara större än eller lika med antalet aktiviteter i alla frågesteg.

Exempel på platsberäkningar

Typ av pipeline Konfiguration Nödvändiga fack
Tillståndslös i en fas (Kafka-källa + mottagare) maxPartitions = 8 8 platser
Tillståndskänsligt i två steg (Kafka-källa + shuffle) maxPartitions = 8, shuffle partitioner = 20 28 platser (8 + 20)
Tresteg (Kafka-källa + shuffle + ompartition) maxPartitions = 8, två blandningssteg på 20 vardera 48 platser (8 + 20 + 20)

Om du inte anger maxPartitions, använd antalet partitioner i Kafka-ämnet.

Ytterligare resurser