DataSourceWriter

Clase base para escritores de orígenes de datos.

Los escritores de orígenes de datos son responsables de guardar datos en un origen de datos. Implemente esta clase y devuelva una instancia de DataSource.writer() para que se pueda escribir un origen de datos.

Agregado en Databricks Runtime 14.3 LTS

Sintaxis

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Método Descripción
write(iterator) Escribe datos en el origen de datos. Se llama una vez en cada ejecutor. Acepta un iterador de Row objetos y devuelve un WriterCommitMessage, o None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse.
commit(messages) Confirma el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas se ejecutan correctamente.
abort(messages) Anula el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas.

Notas

  • El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a commit() si todas las tareas se realizan correctamente o a abort() si se produce un error en alguna tarea.
  • Si se produce un error en una tarea de escritura, su mensaje de confirmación estará None en la lista pasada a commit() o abort().

Ejemplos

Implemente un escritor básico que guarde filas en un archivo:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")