DataStreamReader

Interface utilisée pour charger un DataFrame de diffusion en continu à partir de systèmes de stockage externes (par exemple, des systèmes de fichiers et des magasins clé-valeur). Permet spark.readStream d’y accéder.

Syntaxe

# Access through SparkSession
spark.readStream

Méthodes

Méthode Description
format(source) Spécifie le format de source de données d’entrée.
schema(schema) Spécifie le schéma du DataFrame de streaming.
option(key, value) Ajoute une option d’entrée pour la source de données sous-jacente.
options(**options) Ajoute plusieurs options d’entrée pour la source de données sous-jacente.
load(path) Charge le DataFrame de streaming à partir du chemin donné et le retourne.
json(path) Charge un flux de fichiers JSON et retourne un DataFrame.
orc(path) Charge un flux de fichiers ORC et retourne un DataFrame.
parquet(path) Charge un flux de fichiers Parquet et retourne un DataFrame.
text(path) Charge un flux de fichiers texte et retourne un DataFrame.
csv(path) Charge un flux de fichiers CSV et retourne un DataFrame.
xml(path) Charge un flux de fichiers XML et retourne un DataFrame.
table(tableName) Charge une table Delta de streaming et retourne un DataFrame.
name(source_name) Attribue un nom à la source de diffusion en continu pour l’évolution des points de contrôle.
changes(tableName) Retourne les modifications au niveau des lignes (Capture de données modifiées) de la table spécifiée en tant que DataFrame de diffusion en continu.

Exemples

spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>

Chargez un flux de débit, appliquez une transformation, écrivez dans la console et arrêtez après 3 secondes.

import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()