Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Interface utilisée pour charger un DataFrame de diffusion en continu à partir de systèmes de stockage externes (par exemple, des systèmes de fichiers et des magasins clé-valeur). Permet spark.readStream d’y accéder.
Syntaxe
# Access through SparkSession
spark.readStream
Méthodes
| Méthode | Description |
|---|---|
format(source) |
Spécifie le format de source de données d’entrée. |
schema(schema) |
Spécifie le schéma du DataFrame de streaming. |
option(key, value) |
Ajoute une option d’entrée pour la source de données sous-jacente. |
options(**options) |
Ajoute plusieurs options d’entrée pour la source de données sous-jacente. |
load(path) |
Charge le DataFrame de streaming à partir du chemin donné et le retourne. |
json(path) |
Charge un flux de fichiers JSON et retourne un DataFrame. |
orc(path) |
Charge un flux de fichiers ORC et retourne un DataFrame. |
parquet(path) |
Charge un flux de fichiers Parquet et retourne un DataFrame. |
text(path) |
Charge un flux de fichiers texte et retourne un DataFrame. |
csv(path) |
Charge un flux de fichiers CSV et retourne un DataFrame. |
xml(path) |
Charge un flux de fichiers XML et retourne un DataFrame. |
table(tableName) |
Charge une table Delta de streaming et retourne un DataFrame. |
name(source_name) |
Attribue un nom à la source de diffusion en continu pour l’évolution des points de contrôle. |
changes(tableName) |
Retourne les modifications au niveau des lignes (Capture de données modifiées) de la table spécifiée en tant que DataFrame de diffusion en continu. |
Exemples
spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>
Chargez un flux de débit, appliquez une transformation, écrivez dans la console et arrêtez après 3 secondes.
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()