Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Il caricatore automatico semplifica una serie di attività comuni di inserimento dati. Questo riferimento rapido fornisce esempi per diversi modelli comuni.
Inserire dati dall'archiviazione di oggetti cloud come variante
Auto Loader può caricare tutti i dati dalle origini file supportate come singola VARIANT colonna in una tabella di destinazione. Poiché VARIANT è flessibile per la modifica dello schema e del tipo e mantiene la distinzione tra maiuscole e minuscole e NULL i valori presenti nell'origine dati, questo modello è affidabile per la maggior parte degli scenari di inserimento. Per informazioni dettagliate, vedere Inserire dati dall'archiviazione di oggetti cloud come variante.
Filtro di directory o file usando modelli GLOB
I modelli Glob possono essere usati per filtrare directory e file quando specificati nel percorso.
| Modello | Descrizione |
|---|---|
? |
Corrisponde a qualsiasi carattere singolo |
* |
Corrisponde a zero o più caratteri |
[abc] |
Trova la corrispondenza di un singolo carattere del set di caratteri {a,b,c}. |
[a-z] |
Trova la corrispondenza di un singolo carattere dall'intervallo di caratteri {a... z}. |
[^a] |
Trova la corrispondenza di un singolo carattere non incluso nel set di caratteri o nell'intervallo {a}. Si noti che il ^ carattere deve essere immediatamente a destra della parentesi aperta. |
{ab,cd} |
Corrisponde a una stringa dell'insieme di stringhe {ab, cd}. |
{ab,c{de, fh}} |
Corrisponde a una stringa dal set di stringhe {ab, cde, cfh}. |
Usare il path per fornire modelli di prefisso, ad esempio:
Pitone
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Linguaggio di programmazione Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
È necessario usare l'opzione pathGlobFilter per fornire in modo esplicito modelli di suffisso. Fornisce solo un filtro prefisso path. Ad esempio, se si desidera analizzare solo png i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:
Pitone
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Linguaggio di programmazione Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Nota
Il comportamento predefinito di globbing di Auto Loader è diverso dal comportamento predefinito di altre origini file Spark. Aggiungi .option("cloudFiles.useStrictGlobber", "true") alla lettura per usare il globbing che corrisponde al comportamento predefinito di Spark con le sorgenti file. Per altre informazioni sul globbing, vedere la tabella seguente:
| Modello | Percorso file | Globber predefinito | Globber rigoroso |
|---|---|---|---|
| /a/b | /a/b/c/file.txt | Sì | Sì |
| /a/b | /a/b_dir/c/file.txt | No | No |
| /a/b | /a/b.txt | No | No |
| /a/b/ | /a/b.txt | No | No |
| /a/*/c/ | /a/b/c/file.txt | Sì | Sì |
| /a/*/c/ | /a/b/c/d/file.txt | Sì | Sì |
| /a/*/c/ | /a/b/x/y/c/file.txt | Sì | No |
| /a/*/c | /a/b/c_file.txt | Sì | No |
| /a/*/c/ | /a/b/c_file.txt | Sì | No |
| /a/*/c/ | /a/*/cookie/file.txt | Sì | No |
| /a/b* | /a/b.txt | Sì | Sì |
| /a/b* | /a/b/file.txt | Sì | Sì |
| /a/{0.txt,1.txt} | /a/0.txt | Sì | Sì |
| /a/*/{0.txt,1.txt} | /a/0.txt | No | No |
| /a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Sì | Sì |
Abilitare easy ETL
Un modo semplice per ottenere i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza dello schema con il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando lo schema dei dati di origine cambia. Per impostazione predefinita, lo schema viene considerato come tipi stringa, tutti gli errori di parsing (non dovrebbero verificarsi se tutto rimane come stringa) andranno a _rescued_data, e qualsiasi nuova colonna interromperà il flusso, provocando un'evoluzione dello schema.
Pitone
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Evitare la perdita di dati in dati ben strutturati
Quando si conosce lo schema ma si vogliono acquisire dati imprevisti, Databricks consiglia di usare .rescuedDataColumn
Pitone
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde allo schema, è possibile aggiungere:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Abilitare pipeline di dati semistrutturate flessibili
Quando si ricevono dati da un fornitore che introduce nuove colonne alle informazioni fornite, è possibile che non si sappia esattamente quando lo fanno o che non si dispone della larghezza di banda per aggiornare la pipeline di dati. È ora possibile sfruttare l'evoluzione dello schema per riavviare il flusso e consentire al caricatore automatico di aggiornare automaticamente lo schema dedotto. È anche possibile sfruttare schemaHints per alcuni dei campi "senza schema" che il fornitore potrebbe fornire.
Pitone
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Trasformare i dati JSON annidati
Poiché il caricatore automatico deduce le colonne JSON di primo livello come stringhe, è possibile rimanere con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.
Pitone
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Dedurre i dati JSON annidati
Quando si dispone di dati annidati, è possibile usare l'opzione cloudFiles.inferColumnTypes per dedurre la struttura annidata dei dati e di altri tipi di colonna.
Pitone
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Caricare file CSV senza intestazioni
L'esempio seguente illustra come caricare file CSV senza intestazioni usando il caricatore automatico. Usare rescuedDataColumn per acquisire tutti i dati che non corrispondono allo schema fornito.
Pitone
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Linguaggio di programmazione Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Applicare uno schema ai file CSV con intestazioni
Nell'esempio seguente viene illustrato come applicare uno schema ai file CSV che includono intestazioni. Usare rescuedDataColumn per acquisire tutti i dati che non corrispondono allo schema fornito.
Pitone
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Linguaggio di programmazione Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Inserire dati di immagine o dati binari in Delta Lake per il machine learning
Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedi Eseguire l'inferenza distribuita utilizzando le UDF di pandas.
Pitone
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Linguaggio di programmazione Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Sintassi del caricatore automatico per le pipeline dichiarative spark di Lakeflow
Le pipeline dichiarative di Lakeflow Spark offrono una sintassi Python leggermente modificata per il caricatore automatico e aggiunge il supporto SQL per il caricatore automatico. Gli esempi seguenti usano Auto Loader per creare set di dati da file JSON usando il set di dati di prenotazione viaggi di esempio Wanderbricks :
Pitone
@dp.table
def booking_updates():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)
@dp.table
def reviews():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
)
SQL
CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true
)
CREATE OR REFRESH STREAMING TABLE reviews
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews",
format => "json",
multiLine => true
)
È possibile usare le opzioni di formato supportate per il caricatore automatico. Le opzioni per read_files sono coppie di chiavi e valori. Per informazioni dettagliate sui formati e le opzioni supportati, vedere opzioni .
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)
L'esempio seguente legge i file JSON su più righe con inferenza del tipo di colonna abilitata:
CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true,
inferColumnTypes => true
)
È possibile usare il schema per specificare il formato manualmente; è necessario specificare il schema per i formati che non supportano l'inferenza dello schema :
Pitone
@dp.table
def booking_updates_raw():
return (
spark.readStream.format("cloudFiles")
.schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)
SQL
CREATE OR REFRESH STREAMING TABLE booking_updates_raw
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true,
schema => "booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP"
)
Nota
Le pipeline dichiarative di Lakeflow Spark configurano e gestiscono automaticamente le directory dello schema e del checkpoint quando si usa il caricatore automatico per leggere i file. Tuttavia, se si configura manualmente una di queste directory, l'esecuzione di un aggiornamento completo non influisce sul contenuto delle directory configurate. Databricks consiglia di usare le directory configurate automaticamente per evitare effetti collaterali imprevisti durante l'elaborazione.