Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Anger utdata för den strömmande fråga som ska bearbetas med hjälp av den angivna skrivaren. Bearbetningslogik kan anges som en funktion som tar en rad som indata, eller som ett objekt med process(row) och valfria open(partition_id, epoch_id) metoder och close(error) metoder.
Syntax
foreach(f)
Parameters
| Parameter | Type | Beskrivning |
|---|---|---|
f |
anropsbart eller objekt | En funktion som tar en rad som indata eller ett objekt med en process(row) metod och valfria open metoder.close |
Retur
DataStreamWriter
Notes
Det angivna objektet måste vara serialiserbart. Alla initieringar för att skriva data (till exempel att öppna en anslutning) bör göras i open(), inte vid byggtid.
Exempel
import time
df = spark.readStream.format("rate").load()
Bearbeta varje rad med hjälp av en funktion:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Bearbeta varje rad med hjälp av ett objekt med openmetoderna , processoch close :
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()