Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een basisklasse voor schrijvers van gegevensbronnen.
Schrijvers van gegevensbronnen zijn verantwoordelijk voor het opslaan van gegevens in een gegevensbron. Implementeer deze klasse en retourneer een exemplaar van DataSource.writer() waaruit een gegevensbron schrijfbaar is.
Toegevoegd in Databricks Runtime 14.3 LTS
Syntaxis
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Methods
| Methode | Beschrijving |
|---|---|
write(iterator) |
Hiermee schrijft u gegevens naar de gegevensbron. Wordt eenmaal aangeroepen op elke uitvoerder. Accepteert een iterator van Row objecten en retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd. |
commit(messages) |
Voert de schrijftaak door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken zijn uitgevoerd. |
abort(messages) |
Hiermee wordt de schrijftaak afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken zijn mislukt. |
Aantekeningen
- Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan
commit()als alle taken slagen of aanabort()of een taak mislukt. - Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt
Nonedoorgegeven aancommit()ofabort().
Examples
Implementeer een eenvoudige schrijver waarmee rijen worden opgeslagen in een bestand:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")