foreach (DataStreamWriter)

Establece la salida de la consulta de streaming que se va a procesar mediante el sistema de escritura proporcionado. La lógica de procesamiento se puede especificar como una función que toma una fila como entrada, o como un objeto con process(row) y métodos y open(partition_id, epoch_id) opcionalesclose(error).

Sintaxis

foreach(f)

Parámetros

Parámetro Tipo Descripción
f callable o object Función que toma una fila como entrada o un objeto con un process(row) método y métodos opcionales y openclose .

Devoluciones

DataStreamWriter

Notas

El objeto proporcionado debe ser serializable. Cualquier inicialización para escribir datos (por ejemplo, abrir una conexión) debe realizarse dentro open()de , no en tiempo de construcción.

Ejemplos

import time
df = spark.readStream.format("rate").load()

Procese cada fila mediante una función:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Procese cada fila mediante un objeto con openlos métodos , processy close :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()