DataSourceWriter

Classe de base pour les enregistreurs de sources de données.

Les enregistreurs de sources de données sont responsables de l’enregistrement des données dans une source de données. Implémentez cette classe et retournez une instance pour rendre une source de DataSource.writer() données accessible en écriture.

Ajouté dans Databricks Runtime 14.3 LTS

Syntaxe

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Méthodes

Méthode Description
write(iterator) Écrit des données dans la source de données. Appelé une fois sur chaque exécuteur. Accepte un itérateur d’objets Row et retourne un WriterCommitMessagemessage de validation ou None s’il n’existe aucun message de validation. Cette méthode est abstraite et doit être implémentée.
commit(messages) Valide le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsque toutes les tâches s’exécutent correctement.
abort(messages) Abandonne le travail d’écriture à l’aide d’une liste de messages de validation collectés à partir de tous les exécuteurs. Appelé sur le pilote lorsqu’une ou plusieurs tâches ont échoué.

Remarques

  • Le pilote collecte les messages de validation de tous les exécuteurs et les transmet si commit() toutes les tâches réussissent ou si abort() une tâche échoue.
  • En cas d’échec d’une tâche d’écriture, son message de validation se trouvera None dans la liste transmise ou commit()abort().

Exemples

Implémentez un enregistreur de base qui enregistre des lignes dans un fichier :

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")