Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page décrit les prérequis et la configuration nécessaires pour exécuter des requêtes en mode temps réel dans Structured Streaming. Pour obtenir un didacticiel pas à pas, consultez Tutoriel : Exécuter une charge de travail de streaming en temps réel. Pour plus d’informations conceptuelles sur le mode en temps réel, consultez le mode temps réel dans Structured Streaming.
Conditions préalables
Pour utiliser le mode en temps réel, vous devez configurer votre calcul pour répondre aux exigences suivantes :
- Utilisez le calcul classique. Les modes d’accès dédiés et standard sont pris en charge. Le mode d’accès standard est pris en charge uniquement pour Python. Les pipelines déclaratifs Spark Lakeflow et les clusters serverless ne sont pas pris en charge.
- Utilisez Databricks Runtime 16.4 LTS et versions ultérieures.
- Désactivez la mise à l’échelle automatique.
- Désactivez Photon.
- Affectez la valeur
spark.databricks.streaming.realTimeMode.enabledàtrue. - Désactivez les instances spot pour éviter les interruptions.
Pour les charges de travail sensibles à la latence avec des fonctions définies par l’utilisateur, Databricks recommande d’utiliser le mode d'accès exclusif. Consultez les fonctions de la table.
Pour obtenir des instructions sur la création et la configuration du calcul classique, consultez la référence de configuration de calcul.
Configuration des requêtes
Pour exécuter une requête en mode temps réel, vous devez activer le déclencheur en temps réel. Les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionnement des ressources de calcul
Vous pouvez exécuter un travail en temps réel par ressource de calcul si le calcul a suffisamment d’emplacements de tâches.
Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.
Exemples de calcul d'emplacement
| Type de pipeline | Paramétrage | Emplacements requis |
|---|---|---|
| Étape unique sans état (source Kafka + puits) |
maxPartitions = 8 |
8 emplacements |
| Avec état à deux phases (source Kafka + shuffle) |
maxPartitions = 8, partitions aléatoires = 20 |
28 emplacements (8 + 20) |
| Trois étapes (source Kafka + mélange + répartition) |
maxPartitions = 8, deux phases aléatoires de 20 chacune |
48 emplacements (8 + 20 + 20) |
Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.