Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Définit un filigrane d’heure d’événement pour ce DataFrame. Un filigrane suit un point dans le temps avant lequel nous partons du principe qu’il n’y aura plus de données tardives.
Syntaxe
withWatermark(eventTime: str, delayThreshold: str)
Paramètres
| Paramètre | Type | Description |
|---|---|---|
eventTime |
str | nom de la colonne qui contient l’heure de l’événement de la ligne. |
delayThreshold |
str | délai minimal pour attendre que les données arrivent en retard, par rapport à l’enregistrement le plus récent qui a été traité sous la forme d’un intervalle (par exemple, « 1 minute » ou « 5 heures »). |
Retours
DataFrame: DataFrame en filigrane.
Remarques
Il s’agit d’une fonctionnalité uniquement pour Structured Streaming.
Spark utilise ce filigrane à plusieurs fins :
- Pour savoir quand une agrégation de fenêtre de temps donnée peut être finalisée et peut donc être émise lors de l’utilisation des modes de sortie qui n’autorisent pas les mises à jour.
- Pour réduire la quantité d’état que nous devons conserver pour les agrégations en cours.
Le filigrane actuel est calculé en examinant la MAX(eventTime) vue sur toutes les partitions de la requête moins un utilisateur spécifié delayThreshold. En raison du coût de coordination de cette valeur entre les partitions, le filigrane réel utilisé n’est garanti qu’au moins delayThreshold derrière l’heure de l’événement réel.
Exemples
from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
"value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]