Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Uma classe base para gravadores de fonte de dados.
Os gravadores de fonte de dados são responsáveis por salvar dados em uma fonte de dados. Implemente essa classe e retorne uma instância DataSource.writer() para tornar uma fonte de dados gravável.
Adicionado no Databricks Runtime 14.3 LTS
Sintaxe
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Methods
| Método | Descrição |
|---|---|
write(iterator) |
Grava dados na fonte de dados. Chamado uma vez em cada executor. Aceita um iterador de Row objetos e retorna um WriterCommitMessage, ou None se não há nenhuma mensagem de confirmação. Esse método é abstrato e deve ser implementado. |
commit(messages) |
Confirma o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando todas as tarefas são executadas com êxito. |
abort(messages) |
Anula o trabalho de gravação usando uma lista de mensagens de confirmação coletadas de todos os executores. Invocado no driver quando uma ou mais tarefas falharam. |
Observações
- O driver coleta mensagens de confirmação de todos os executores e as passa para
commit()se todas as tarefas tiverem êxito ou seabort()alguma tarefa falhar. - Se uma tarefa de gravação falhar, sua mensagem de confirmação estará
Nonena lista passada paracommit()ouabort().
Exemplos
Implemente um gravador básico que salva linhas em um arquivo:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")