Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Establece la salida de la consulta de streaming que se va a procesar mediante el sistema de escritura proporcionado. La lógica de procesamiento se puede especificar como una función que toma una fila como entrada, o como un objeto con process(row) y métodos y open(partition_id, epoch_id) opcionalesclose(error).
Sintaxis
foreach(f)
Parámetros
| Parámetro | Tipo | Descripción |
|---|---|---|
f |
callable o object | Función que toma una fila como entrada o un objeto con un process(row) método y métodos opcionales y openclose . |
Devoluciones
DataStreamWriter
Notas
El objeto proporcionado debe ser serializable. Cualquier inicialización para escribir datos (por ejemplo, abrir una conexión) debe realizarse dentro open()de , no en tiempo de construcción.
Ejemplos
import time
df = spark.readStream.format("rate").load()
Procese cada fila mediante una función:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Procese cada fila mediante un objeto con openlos métodos , processy close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()