Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Hiermee stelt u de uitvoer van de streamingquery in die moet worden verwerkt met behulp van de opgegeven schrijver. De verwerkingslogica kan worden opgegeven als een functie die een rij als invoer gebruikt, of als een object met process(row) en optioneel open(partition_id, epoch_id) en close(error) methoden.
Syntaxis
foreach(f)
Parameterwaarden
| Kenmerk | Typ | Beschrijving |
|---|---|---|
f |
aanroepbaar of object | Een functie die een rij als invoer gebruikt of een object met een process(row) methode en optioneel open en close methoden. |
Retouren
DataStreamWriter
Aantekeningen
Het opgegeven object moet serialiseerbaar zijn. Elke initialisatie voor het schrijven van gegevens (bijvoorbeeld het openen van een verbinding) moet binnen open()worden uitgevoerd, niet op bouwtijd.
Examples
import time
df = spark.readStream.format("rate").load()
Elke rij verwerken met behulp van een functie:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Elke rij verwerken met behulp van een object met open, processen close methoden:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()