Einrichten des Echtzeitmodus

Auf dieser Seite werden die voraussetzungen und die Konfiguration beschrieben, die zum Ausführen von Echtzeitmodusabfragen im strukturierten Streaming erforderlich sind. Ein schrittweises Lernprogramm finden Sie im Lernprogramm: Ausführen einer Echtzeit-Streaming-Workload. Konzeptionelle Informationen zum Echtzeitmodus finden Sie im Echtzeitmodus im strukturierten Streaming.

Voraussetzungen

Um den Echtzeitmodus zu verwenden, müssen Sie Ihre Berechnung so konfigurieren, dass sie die folgenden Anforderungen erfüllt:

  • Verwenden Sie klassische Rechenkapazität. Dedizierte und Standardzugriffsmodi werden unterstützt. Der Standardzugriffsmodus wird nur für Python unterstützt. Lakeflow Spark Declarative Pipelines und serverlose Cluster werden nicht unterstützt.
  • Verwenden Sie Databricks Runtime 16.4 LTS und höher.
  • Deaktivieren Sie die automatische Skalierung.
  • Photon deaktivieren.
  • Setzen Sie spark.databricks.streaming.realTimeMode.enabled auf true.
  • Schalten Sie Spotinstanzen aus, um Unterbrechungen zu vermeiden.

Bei latenzempfindlichen Workloads mit UDFs empfiehlt Databricks die Verwendung des dedizierten Zugriffsmodus. Siehe Tabellenfunktionen.

Anweisungen zum Erstellen und Konfigurieren einer klassischen Compute-Instanz finden Sie in der Referenz zur Computekonfiguration.

Abfragekonfiguration

Um eine Abfrage im Echtzeitmodus auszuführen, müssen Sie den Echtzeittrigger aktivieren. Echtzeittrigger werden nur im Updatemodus unterstützt.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Skalierung von Berechnungen

Sie können einen Echtzeitauftrag pro Rechenressource ausführen, wenn die Rechenressource über genügend Task-Slots verfügt.

Um im Modus mit geringer Latenz ausgeführt zu werden, muss die Gesamtanzahl der verfügbaren Aufgabenplätze größer oder gleich der Anzahl der Aufgaben in allen Abfragephasen sein.

Slot-Berechnungsbeispiele

Pipelinetyp Konfiguration Erforderliche Steckplätze
Einzelstufenzustandslos (Kafka Source + Sink) maxPartitions = 8 8 Steckplätze
Zweistufiger Zustand (Kafka-Quelle + Shuffle) maxPartitions = 8, Shuffle-Partitionen = 20 28 Steckplätze (8 + 20)
Dreistufige (Kafka-Quelle + *shuffle* + *repartition*) maxPartitions = 8, zwei Shuffle-Phasen mit jeweils 20 48 Steckplätze (8 + 20 + 20)

Wenn Sie maxPartitions nicht festlegen, verwenden Sie die Anzahl der Partitionen im Kafka-Thema.

Weitere Ressourcen