Configurer le mode en temps réel

Cette page décrit les prérequis et la configuration nécessaires pour exécuter des requêtes en mode temps réel dans Structured Streaming. Pour obtenir un didacticiel pas à pas, consultez Tutoriel : Exécuter une charge de travail de streaming en temps réel. Pour plus d’informations conceptuelles sur le mode en temps réel, consultez le mode temps réel dans Structured Streaming.

Conditions préalables

Pour utiliser le mode en temps réel, vous devez configurer votre calcul pour répondre aux exigences suivantes :

  • Utilisez le calcul classique. Les modes d’accès dédiés et standard sont pris en charge. Le mode d’accès standard est pris en charge uniquement pour Python. Les pipelines déclaratifs Spark Lakeflow et les clusters serverless ne sont pas pris en charge.
  • Utilisez Databricks Runtime 16.4 LTS et versions ultérieures.
  • Désactivez la mise à l’échelle automatique.
  • Désactivez Photon.
  • Affectez la valeur spark.databricks.streaming.realTimeMode.enabled à true.
  • Désactivez les instances spot pour éviter les interruptions.

Pour les charges de travail sensibles à la latence avec des fonctions définies par l’utilisateur, Databricks recommande d’utiliser le mode d'accès exclusif. Consultez les fonctions de la table.

Pour obtenir des instructions sur la création et la configuration du calcul classique, consultez la référence de configuration de calcul.

Configuration des requêtes

Pour exécuter une requête en mode temps réel, vous devez activer le déclencheur en temps réel. Les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionnement des ressources de calcul

Vous pouvez exécuter un travail en temps réel par ressource de calcul si le calcul a suffisamment d’emplacements de tâches.

Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.

Exemples de calcul d'emplacement

Type de pipeline Paramétrage Emplacements requis
Étape unique sans état (source Kafka + puits) maxPartitions = 8 8 emplacements
Avec état à deux phases (source Kafka + shuffle) maxPartitions = 8, partitions aléatoires = 20 28 emplacements (8 + 20)
Trois étapes (source Kafka + mélange + répartition) maxPartitions = 8, deux phases aléatoires de 20 chacune 48 emplacements (8 + 20 + 20)

Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.

Ressources additionnelles