DataSourceWriter

Een basisklasse voor schrijvers van gegevensbronnen.

Schrijvers van gegevensbronnen zijn verantwoordelijk voor het opslaan van gegevens in een gegevensbron. Implementeer deze klasse en retourneer een exemplaar van DataSource.writer() waaruit een gegevensbron schrijfbaar is.

Toegevoegd in Databricks Runtime 14.3 LTS

Syntaxis

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Methode Beschrijving
write(iterator) Hiermee schrijft u gegevens naar de gegevensbron. Wordt eenmaal aangeroepen op elke uitvoerder. Accepteert een iterator van Row objecten en retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd.
commit(messages) Voert de schrijftaak door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken zijn uitgevoerd.
abort(messages) Hiermee wordt de schrijftaak afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken zijn mislukt.

Aantekeningen

  • Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan commit() als alle taken slagen of aan abort() of een taak mislukt.
  • Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt None doorgegeven aan commit() of abort().

Examples

Implementeer een eenvoudige schrijver waarmee rijen worden opgeslagen in een bestand:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")