Compartilhar via


DataSourceWriter

Uma classe base para gravadores de fonte de dados.

Os gravadores de fonte de dados são responsáveis por salvar dados em uma fonte de dados. Implemente essa classe e retorne uma instância DataSource.writer() para tornar uma fonte de dados gravável.

Adicionado no Databricks Runtime 14.3 LTS

Sintaxe

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Método Descrição
write(iterator) Grava dados na fonte de dados. Chamado uma vez em cada executor. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não há nenhuma mensagem de confirmação. Esse método é abstrato e deve ser implementado.
commit(messages) Confirma o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando todas as tarefas são executadas com êxito.
abort(messages) Anula o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas falharam.

Observações

  • O driver coleta mensagens de confirmação de todos os executores e as passa para commit() se todas as tarefas tiverem êxito ou se abort() alguma tarefa falhar.
  • Se uma tarefa de gravação falhar, sua mensagem de confirmação estará None na lista passada para commit() ou abort().

Exemplos

Implemente um gravador básico que salva linhas em um arquivo:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")