Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Définit la sortie de la requête de diffusion en continu à traiter à l’aide de l’enregistreur fourni. La logique de traitement peut être spécifiée en tant que fonction qui prend une ligne en tant qu’entrée, ou en tant qu’objet avec process(row) et facultatif open(partition_id, epoch_id) et close(error) méthodes.
Syntaxe
foreach(f)
Paramètres
| Paramètre | Type | Description |
|---|---|---|
f |
pouvant être appelé ou objet | Fonction qui prend une ligne comme entrée ou un objet avec une process(row) méthode et des méthodes facultativesopen.close |
Retours
DataStreamWriter
Remarques
L’objet fourni doit être sérialisable. Toute initialisation pour l’écriture de données (par exemple, l’ouverture d’une connexion) doit être effectuée à l’intérieur open(), et non au moment de la construction.
Exemples
import time
df = spark.readStream.format("rate").load()
Traitez chaque ligne à l’aide d’une fonction :
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Traitez chaque ligne à l’aide d’un objet avec open, processet close des méthodes :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()