foreach (DataStreamWriter)

Anger utdata för den strömmande fråga som ska bearbetas med hjälp av den angivna skrivaren. Bearbetningslogik kan anges som en funktion som tar en rad som indata, eller som ett objekt med process(row) och valfria open(partition_id, epoch_id) metoder och close(error) metoder.

Syntax

foreach(f)

Parameters

Parameter Type Beskrivning
f anropsbart eller objekt En funktion som tar en rad som indata eller ett objekt med en process(row) metod och valfria open metoder.close

Retur

DataStreamWriter

Notes

Det angivna objektet måste vara serialiserbart. Alla initieringar för att skriva data (till exempel att öppna en anslutning) bör göras i open(), inte vid byggtid.

Exempel

import time
df = spark.readStream.format("rate").load()

Bearbeta varje rad med hjälp av en funktion:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Bearbeta varje rad med hjälp av ett objekt med openmetoderna , processoch close :

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()