Automatische typebreiding met automatisch laadprogramma

Belangrijk

Deze functie bevindt zich in openbare preview in Databricks Runtime 16.4 en hoger.

Auto Loader verwerkt stapsgewijs en efficiënt nieuwe gegevensbestanden wanneer ze binnenkomen in de cloudopslag. Het vermindert ook het onderhoud van pijplijnen door automatisch complexe schemawijzigingen te verwerken. U kunt bijvoorbeeld Automatisch laden zo configureren dat het schema van geladen gegevens automatisch wordt gedetecteerd, zodat u tabellen kunt initialiseren zonder het gegevensschema expliciet te declareren. U kunt ook het tabelschema ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd, waardoor u geen schemawijzigingen handmatig hoeft bij te houden en in de loop van de tijd hoeft toe te passen. AutoLoader kan zelfs gegevens redden die onverwacht zijn (bijvoorbeeld vanwege verschillende gegevenstypen) in een opgeslagen gegevenskolom, zodat u gegevensverlies kunt voorkomen.

Voor de herstelde gegevenskolom moet u echter handmatig wijzigingen in het gegevenstype verwerken.

Als u bepaalde wijzigingen van dit gegevenstype automatisch wilt afhandelen, gebruikt u type widening in Auto Loader. Delta Lake biedt nu ondersteuning voor verschillende uitbreidingen van gegevenstypen zonder dat gegevens opnieuw hoeven te worden geschreven of tussenkomst van gebruikers nodig is. Zie voor meer informatie over Delta Lake Type widening. De nieuwe modus voor de ontwikkeling van schema's, addNewColumnsWithTypeWideningontwikkelt automatisch een schema op compatibele wijzigingen in het gegevenstype.

U kunt primitieve typen breder maken zoals int naar long, float naar double, en meer. Typebreiding is beschikbaar voor alle bestandsindelingen met ondersteuning voor schemaontwikkeling in Auto Loader. Dit omvat tekstindelingen (zoals JSON, CSV of XML) en binaire indelingen (zoals Avro of Parquet). Er is geen wijziging in het gedrag van schemaontwikkeling voor bestaande modi voor schemaontwikkeling (zoals addNewColumnsrescue, failOnNewColumns, of none).

Ondersteunde typewijzigingen

De volgende typewijzigingen worden ondersteund:

Type bron Ondersteunde uitgebreidere typen
byte short int, long, decimaldouble
short int,long,decimal,double
int long, decimal, double
long decimal
float double
decimal decimal met meer precisie en schaal
date timestampNTZ (alleen voor Parquet-bestanden)

Bij het verbreden van een numeriek type naar decimal, verbreedt Auto Loader naar decimal met een precisie die gelijk is aan of groter is dan de beginprecisie. Als u de schaal verhoogt, neemt de totale precisie toe met een corresponderend bedrag.

De beginprecisie van gehele getallen is het volgende:

Typ Startprecisie
byte 10
short 10
int 10
long 20

Als het huidige type van een kolom bijvoorbeeld int is en een bestand met het type van die kolom zoals decimal(5, 2) wordt gelezen, breidt Auto Loader het type van die kolom uit naar decimal(12, 2).

Vereiste voorwaarden

Als u typebreiding met automatisch laden wilt gebruiken, moet u aan de volgende vereisten voldoen:

  • Databricks Runtime 16.4 of hoger gebruiken.
  • Als de schrijfsink een Delta Lake-tabel is, schakelt u 'type widening' in voor de Delta Lake-tabel met behulp van een van de volgende methoden:
    • Als u een bestaande tabel gebruikt:

      ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
      
    • Als u een nieuwe tabel maakt waarvoor het type widening is ingeschakeld:

      CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
      

Voor meer informatie over het verbreden van typen in Delta Lake-tabellen, zie Type widening.

Typebreiding met schemaontwikkeling inschakelen

Als u typebreiding wilt gebruiken met Auto Loader, specificeer addNewColumnsWithTypeWidening wanneer u schema-evolutie gebruikt. Auto Loader detecteert de toevoeging van nieuwe kolommen en typewijzigingen bij het verwerken van uw gegevens.

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", <schemaPath>)
  .option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
  .load(<inputPath>)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", <checkpointPath>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", <schemaPath>)
  .option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
  .load(<inputPath>)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", <checkpointPath>)
  .trigger(Trigger.AvailableNow())
  .toTable("table_name")

Wanneer Automatisch Laden een nieuwe kolom of een typewijziging detecteert die typebreiding ondersteunt, stopt de datastroom met een UnknownFieldException. Voordat uw stroom deze fout genereert, voert Auto Loader schema-afleiding uit op de meest recente microbatch met gegevens en werkt Auto Loader de schemalocatie bij met het nieuwste schema door de bestaande kolommen uit te breiden of nieuwe kolommen samen te voegen aan het schema.

Gedrag van schemaontwikkeling bij wijzigingen in gegevenstypen

Als u een CSV met de volgende inhoud wilt opnemen, wordt het schema automatisch afgeleid als STRUCT<id INT, name STRING, _rescued_data STRING>.

id, name
1, John
2, Mary

De doelstelling van de tabel ziet er als volgt uit:

identiteitskaart naam _rescued_data
1 John NUL
2 Maria NUL

Neem nu een ander CSV-bestand op waarbij de waarden in de id kolom breder zijn dan INT het type:

id, name, age
2147483648, Bob, 25

In de volgende tabel wordt het gedrag en de uitvoer uitgelegd met verschillende modi voor schemaontwikkeling in AutoLoader:

Mode Gedrag met ondersteunde wijziging van gegevenstype die kan worden uitgebreid
addNewColumns (standaard) Het gegevenstype ontwikkelt zich niet en de stroom mislukt niet vanwege een wijziging van het gegevenstype. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens. Stream mislukt bij nieuwe kolommen.
rescue Het schema ontwikkelt zich niet en streams mislukken niet vanwege schemawijzigingen. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens.
failOnNewColumns Het gegevenstype ontwikkelt zich niet en de stroom mislukt niet vanwege een wijziging van het gegevenstype. Kolommen met niet-overeenkomende waarden van het type worden ingesteld op NULLen de niet-overeenkomende waarden worden toegevoegd aan de kolom met geredde gegevens. Stream mislukt op nieuwe kolommen zonder het schema te ontwikkelen.
none Het schema wordt niet aangepast, nieuwe kolommen worden genegeerd en gegevens worden niet gered, tenzij de rescuedDataColumn optie is ingesteld. Stream mislukt niet vanwege schemawijzigingen.
addNewColumnsWithTypeWidening Stream mislukt. Nieuwe kolommen worden toegevoegd aan het schema en ondersteunde wijzigingen in gegevenstypen worden uitgebreid. Wijzigingen in niet-ondersteund gegevenstype (bijvoorbeeld int aan string) worden toegevoegd aan de kolom met geredde gegevens.

Voorbeeld van resultaten

In de volgende tabel ziet u het uitgestelde schema en de waarden voor elke modus voor de evolutie van het schema na het opnemen van het tweede CSV-bestand:

Beperkingen

  • De optie prefersDecimal kan niet worden ingesteld op false wanneer u addNewColumnsWithTypeWidening gebruikt. Wanneer addNewColumnsWithTypeWidening is opgegeven, is de standaardwaarde van prefersDecimaltrue .
  • date naar timestampNTZ verbreding wordt alleen ondersteund voor Parquet-bestanden.