DataSourceWriter

En basklass för datakällans författare.

Datakällans författare ansvarar för att spara data till en datakälla. Implementera den här klassen och returnera en instans från DataSource.writer() för att göra en datakälla skrivbar.

Har lagts till i Databricks Runtime 14.3 LTS

Syntax

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Metod Beskrivning
write(iterator) Skriver data till datakällan. Anropas en gång på varje köre. Accepterar en iterator av Row objekt och returnerar ett WriterCommitMessage, eller None om det inte finns något incheckningsmeddelande. Den här metoden är abstrakt och måste implementeras.
commit(messages) Genomför skrivjobbet med hjälp av en lista över incheckningsmeddelanden som samlats in från alla utförare. Anropas på drivrutinen när alla aktiviteter körs.
abort(messages) Avbryter skrivjobbet med hjälp av en lista över incheckningsmeddelanden som samlats in från alla utförare. Anropas på drivrutinen när en eller flera uppgifter misslyckades.

Notes

  • Drivrutinen samlar in incheckningsmeddelanden från alla utförare och skickar dem till commit() om alla uppgifter lyckas, eller till abort() om någon uppgift misslyckas.
  • Om en skrivuppgift misslyckas finns dess incheckningsmeddelande i listan som skickas None till commit() eller abort().

Exempel

Implementera en grundläggande skrivare som sparar rader i en fil:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")