foreach (DataStreamWriter)

Hiermee stelt u de uitvoer van de streamingquery in die moet worden verwerkt met behulp van de opgegeven schrijver. De verwerkingslogica kan worden opgegeven als een functie die een rij als invoer gebruikt, of als een object met process(row) en optioneel open(partition_id, epoch_id) en close(error) methoden.

Syntaxis

foreach(f)

Parameterwaarden

Kenmerk Typ Beschrijving
f aanroepbaar of object Een functie die een rij als invoer gebruikt of een object met een process(row) methode en optioneel open en close methoden.

Retouren

DataStreamWriter

Aantekeningen

Het opgegeven object moet serialiseerbaar zijn. Elke initialisatie voor het schrijven van gegevens (bijvoorbeeld het openen van een verbinding) moet binnen open()worden uitgevoerd, niet op bouwtijd.

Examples

import time
df = spark.readStream.format("rate").load()

Elke rij verwerken met behulp van een functie:

def print_row(row):
    print(row)

q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()

Elke rij verwerken met behulp van een object met open, processen close methoden:

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        print(row)

    def close(self, error):
        print("Closed with error: %s" % str(error))

q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()