Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Interfaz usada para cargar un dataframe de streaming desde sistemas de almacenamiento externos (por ejemplo, sistemas de archivos y almacenes de clave-valor). Use spark.readStream para acceder a esto.
Sintaxis
# Access through SparkSession
spark.readStream
Methods
| Método | Descripción |
|---|---|
format(source) |
Especifica el formato del origen de datos de entrada. |
schema(schema) |
Especifica el esquema del dataframe de streaming. |
option(key, value) |
Agrega una opción de entrada para el origen de datos subyacente. |
options(**options) |
Agrega varias opciones de entrada para el origen de datos subyacente. |
load(path) |
Carga el dataframe de streaming desde la ruta de acceso especificada y lo devuelve. |
json(path) |
Carga una secuencia de archivos JSON y devuelve un dataframe. |
orc(path) |
Carga una secuencia de archivos ORC y devuelve un dataframe. |
parquet(path) |
Carga una secuencia de archivos Parquet y devuelve un DataFrame. |
text(path) |
Carga una secuencia de archivos de texto y devuelve un dataframe. |
csv(path) |
Carga una secuencia de archivos CSV y devuelve un dataframe. |
xml(path) |
Carga una secuencia de archivos XML y devuelve un dataframe. |
table(tableName) |
Carga una tabla Delta de streaming y devuelve un dataframe. |
name(source_name) |
Asigna un nombre al origen de streaming para la evolución del punto de control. |
changes(tableName) |
Devuelve los cambios de nivel de fila (Captura de datos modificados) de la tabla especificada como dataframe de streaming. |
Ejemplos
spark.readStream
# <...streaming.readwriter.DataStreamReader object ...>
Cargue un flujo de velocidad, aplique una transformación, escriba en la consola y detenga después de 3 segundos.
import time
df = spark.readStream.format("rate").load()
df = df.selectExpr("value % 3 as v")
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()