foreach (DataStreamWriter)

Définit la sortie de la requête de diffusion en continu à traiter à l’aide de l’enregistreur fourni. La logique de traitement peut être spécifiée en tant que fonction qui prend une ligne en tant qu’entrée, ou en tant qu’objet avec process(row) et facultatif open(partition_id, epoch_id) et close(error) méthodes.

Syntaxe

foreach(f)

Paramètres

Paramètre Type Description
f pouvant être appelé ou objet Fonction qui prend une ligne comme entrée ou un objet avec une process(row) méthode et des méthodes facultativesopen.close

Retours

DataStreamWriter

Remarques

L’objet fourni doit être sérialisable. Toute initialisation pour l’écriture de données (par exemple, l’ouverture d’une connexion) doit être effectuée à l’intérieur open(), et non au moment de la construction.

Exemples

import time
df = spark.readStream.format("rate").load()

Traitez chaque ligne à l’aide d’une fonction :

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Traitez chaque ligne à l’aide d’un objet avec open, processet close des méthodes :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()