Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
Deze functie bevindt zich in openbare preview in Databricks Runtime 16.4 en hoger.
Auto Loader verwerkt stapsgewijs en efficiënt nieuwe gegevensbestanden wanneer ze binnenkomen in de cloudopslag. Het vermindert ook het onderhoud van pijplijnen door automatisch complexe schemawijzigingen te verwerken. U kunt bijvoorbeeld Automatisch laden zo configureren dat het schema van geladen gegevens automatisch wordt gedetecteerd, zodat u tabellen kunt initialiseren zonder het gegevensschema expliciet te declareren. U kunt ook het tabelschema ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd, waardoor u geen schemawijzigingen handmatig hoeft bij te houden en in de loop van de tijd hoeft toe te passen. AutoLoader kan zelfs gegevens redden die onverwacht zijn (bijvoorbeeld vanwege verschillende gegevenstypen) in een opgeslagen gegevenskolom, zodat u gegevensverlies kunt voorkomen.
Voor de herstelde gegevenskolom moet u echter handmatig wijzigingen in het gegevenstype verwerken.
Als u bepaalde wijzigingen van dit gegevenstype automatisch wilt afhandelen, gebruikt u type widening in Auto Loader. Delta Lake biedt nu ondersteuning voor verschillende uitbreidingen van gegevenstypen zonder dat gegevens opnieuw hoeven te worden geschreven of tussenkomst van gebruikers nodig is. Zie voor meer informatie over Delta Lake Type widening. De nieuwe modus voor de ontwikkeling van schema's, addNewColumnsWithTypeWideningontwikkelt automatisch een schema op compatibele wijzigingen in het gegevenstype.
U kunt primitieve typen breder maken zoals int naar long, float naar double, en meer. Typebreiding is beschikbaar voor alle bestandsindelingen met ondersteuning voor schemaontwikkeling in Auto Loader. Dit omvat tekstindelingen (zoals JSON, CSV of XML) en binaire indelingen (zoals Avro of Parquet). Er is geen wijziging in het gedrag van schemaontwikkeling voor bestaande modi voor schemaontwikkeling (zoals addNewColumnsrescue, failOnNewColumns, of none).
Ondersteunde typewijzigingen
De volgende typewijzigingen worden ondersteund:
| Type bron | Ondersteunde uitgebreidere typen |
|---|---|
byte |
short
int, long, decimaldouble |
short |
int,long,decimal,double |
int |
long, decimal, double |
long |
decimal |
float |
double |
decimal |
decimal met meer precisie en schaal |
date |
timestampNTZ (alleen voor Parquet-bestanden) |
Bij het verbreden van een numeriek type naar decimal, verbreedt Auto Loader naar decimal met een precisie die gelijk is aan of groter is dan de beginprecisie. Als u de schaal verhoogt, neemt de totale precisie toe met een corresponderend bedrag.
De beginprecisie van gehele getallen is het volgende:
| Typ | Startprecisie |
|---|---|
byte |
10 |
short |
10 |
int |
10 |
long |
20 |
Als het huidige type van een kolom bijvoorbeeld int is en een bestand met het type van die kolom zoals decimal(5, 2) wordt gelezen, breidt Auto Loader het type van die kolom uit naar decimal(12, 2).
Vereiste voorwaarden
Als u typebreiding met automatisch laden wilt gebruiken, moet u aan de volgende vereisten voldoen:
- Databricks Runtime 16.4 of hoger gebruiken.
- Als de schrijfsink een Delta Lake-tabel is, schakelt u 'type widening' in voor de Delta Lake-tabel met behulp van een van de volgende methoden:
Als u een bestaande tabel gebruikt:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')Als u een nieuwe tabel maakt waarvoor het type widening is ingeschakeld:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Voor meer informatie over het verbreden van typen in Delta Lake-tabellen, zie Type widening.
Typebreiding met schemaontwikkeling inschakelen
Als u typebreiding wilt gebruiken met Auto Loader, specificeer addNewColumnsWithTypeWidening wanneer u schema-evolutie gebruikt. Auto Loader detecteert de toevoeging van nieuwe kolommen en typewijzigingen bij het verwerken van uw gegevens.
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(Trigger.AvailableNow())
.toTable("table_name")
Wanneer Automatisch Laden een nieuwe kolom of een typewijziging detecteert die typebreiding ondersteunt, stopt de datastroom met een UnknownFieldException. Voordat uw stroom deze fout genereert, voert Auto Loader schema-afleiding uit op de meest recente microbatch met gegevens en werkt Auto Loader de schemalocatie bij met het nieuwste schema door de bestaande kolommen uit te breiden of nieuwe kolommen samen te voegen aan het schema.
Gedrag van schemaontwikkeling bij wijzigingen in gegevenstypen
Als u een CSV met de volgende inhoud wilt opnemen, wordt het schema automatisch afgeleid als STRUCT<id INT, name STRING, _rescued_data STRING>.
id, name
1, John
2, Mary
De doelstelling van de tabel ziet er als volgt uit:
| identiteitskaart | naam | _rescued_data |
|---|---|---|
| 1 | John | NUL |
| 2 | Maria | NUL |
Neem nu een ander CSV-bestand op waarbij de waarden in de id kolom breder zijn dan INT het type:
id, name, age
2147483648, Bob, 25
In de volgende tabel wordt het gedrag en de uitvoer uitgelegd met verschillende modi voor schemaontwikkeling in AutoLoader:
| Mode | Gedrag met ondersteunde wijziging van gegevenstype die kan worden uitgebreid |
|---|---|
addNewColumns (standaard) |
Het gegevenstype ontwikkelt zich niet en de stroom mislukt niet vanwege een wijziging van het gegevenstype. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens. Stream mislukt bij nieuwe kolommen. |
rescue |
Het schema ontwikkelt zich niet en streams mislukken niet vanwege schemawijzigingen. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens. |
failOnNewColumns |
Het gegevenstype ontwikkelt zich niet en de stroom mislukt niet vanwege een wijziging van het gegevenstype. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens. Stream mislukt op nieuwe kolommen zonder het schema te ontwikkelen. |
none |
Het schema wordt niet aangepast, nieuwe kolommen worden genegeerd en gegevens worden niet gered, tenzij de rescuedDataColumn optie is ingesteld. Stream mislukt niet vanwege schemawijzigingen. |
addNewColumnsWithTypeWidening |
Stream mislukt. Nieuwe kolommen worden toegevoegd aan het schema en ondersteunde wijzigingen in gegevenstypen worden uitgebreid. Wijzigingen in niet-ondersteund gegevenstype (bijvoorbeeld int aan string) worden toegevoegd aan de kolom met geredde gegevens. |
Voorbeeld van resultaten
In de volgende tabel ziet u het uitgestelde schema en de waarden voor elke modus voor de evolutie van het schema na het opnemen van het tweede CSV-bestand:
Beperkingen
- De optie
prefersDecimalkan niet worden ingesteld opfalsewanneer uaddNewColumnsWithTypeWideninggebruikt. WanneeraddNewColumnsWithTypeWideningis opgegeven, is de standaardwaarde vanprefersDecimaltrue. -
datenaartimestampNTZverbreding wordt alleen ondersteund voor Parquet-bestanden.