Automatisk typförstoring med Auto Loader

Viktigt!

Den här funktionen finns i offentlig förhandsversion i Databricks Runtime 16.4 och senare.

Auto Loader bearbetar inkrementellt och effektivt nya datafiler när de anländer till molnlagringen. Det minskar också pipelineunderhållet genom att automatiskt hantera komplexa schemaändringar. Du kan till exempel konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat. Du kan också utveckla tabellschemat när nya kolumner introduceras, vilket eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid. Auto loader kan till och med rädda data som är oväntade (till exempel på grund av olika datatyper) i en räddad datakolumn, vilket hjälper dig att undvika dataförlust.

Den räddade datakolumnen kräver dock att du manuellt hanterar ändringar av datatyper.

Om du vill hantera vissa av dessa ändringar av datatypen automatiskt använder du typbreddning i Automatisk inläsning. Delta Lake stöder nu olika ändringar av datatypsbreddning utan att behöva skriva om data eller användaringrepp. Se Delta Lake Type widening. Det nya läget för schemautveckling, addNewColumnsWithTypeWidening, utvecklar automatiskt schemat för kompatibla datatypändringar.

Du kan bredda primitiva typer som int till long, float till doubleoch med. Typbreddning är tillgängligt för alla filformat med stöd för schemautveckling i Auto Loader. Detta inkluderar textformat (till exempel JSON, CSV eller XML) och binära format (till exempel Avro eller Parquet). Det finns ingen ändring i schemautvecklingsbeteendet för befintliga schemautvecklingslägen (till exempel addNewColumns, rescue, failOnNewColumnseller none).

Typändringar som stöds

Följande typändringar stöds:

Typ av källa Bredare typer som stöds
byte short, int, long, decimal, double
short int, long, , decimaldouble
int long, decimal, double
long decimal
float double
decimal decimal med större precision och skalning
date timestampNTZ (endast för Parquet-filer)

När du breddar en numerisk typ till decimalutvidgas autoinläsningen till decimal med precision som är lika med eller större än startprecisionen. Om du ökar skalan ökar den totala precisionen med motsvarande mängd.

Startprecisionen för heltalstyper är följande:

Type Startprecision
byte 10
short 10
int 10
long 20

Om den aktuella typen av en kolumn till exempel är intoch en fil med den typen av kolumn som decimal(5, 2) läses, breddar Auto Loader typen av kolumnen till decimal(12, 2).

Förutsättningar

Om du vill använda typbreddning med Auto Loader måste du uppfylla följande krav:

  • Använd Databricks Runtime 16.4 eller senare.
  • Om skrivmottagaren är en Delta Lake-tabell aktiverar du typbreddning för Delta Lake-tabellen med någon av följande metoder:
    • Om du använder en befintlig tabell:

      ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
      
    • Om du skapar en ny tabell med typbreddning aktiverat:

      CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
      

Mer information om typbreddning i Delta Lake-tabeller finns i Typbreddning.

Aktivera typbreddning med schemautveckling

Om du vill använda typbreddning med Auto Loader anger du addNewColumnsWithTypeWidening när du använder schemautveckling. Auto Loader identifierar tillägg av nya kolumner och ändringar i datatyper när din data bearbetas.

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", <schemaPath>)
  .option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
  .load(<inputPath>)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", <checkpointPath>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", <schemaPath>)
  .option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
  .load(<inputPath>)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", <checkpointPath>)
  .trigger(Trigger.AvailableNow())
  .toTable("table_name")

När Auto Loader identifierar en ny kolumn eller en typändring som stöds av typbreddning stoppas strömmen med en UnknownFieldException. Innan strömmen genererar det här felet utför Auto Loader schemainferens på den senaste mikrobatchen med data och uppdaterar schemaplatsen med det senaste schemat genom att bredda befintliga kolumner eller slå samman nya kolumner till slutet av schemat.

Schemautvecklingsbeteende för datatypsändringar

Om du skulle mata in en CSV med följande innehåll härleder Auto Loader schemat som STRUCT<id INT, name STRING, _rescued_data STRING>.

id, name
1, John
2, Mary

Måltabellen ser ut så här:

Id-nummer Namn _återställda_data
1 John NOLL
2 Maria NOLL

Mata nu in en annan CSV-fil där värdena i id kolumnen är bredare än INT typ:

id, name, age
2147483648, Bob, 25

I följande tabell förklaras beteendet och utdata med olika schemautvecklingslägen i Auto Loader:

Läge Beteende vid förändring av stöd för utvidgbar datatyp
addNewColumns (standardinställning) Datatypen utvecklas inte och dataströmmen misslyckas inte på grund av ändring av datatyp. Kolumner med typfelvärden ställs in på NULL, och de felmatchade värdena läggs till i den räddade datakolumnen. Flödet misslyckas på nya kolumner.
rescue Schemat utvecklas inte och strömmar misslyckas inte på grund av schemaändringar. Kolumner med typfelvärden ställs in på NULL, och de felmatchade värdena läggs till i den räddade datakolumnen.
failOnNewColumns Datatypen utvecklas inte och dataströmmen misslyckas inte på grund av ändring av datatyp. Kolumner med typfelvärden ställs in på NULL, och de felmatchade värdena läggs till i den räddade datakolumnen. Stream misslyckas med nya kolumner utan att utveckla schemat.
none Utvecklar inte schemat, nya kolumner ignoreras och data räddas inte om inte rescuedDataColumn alternativet har angetts. Stream misslyckas inte på grund av schemaändringar.
addNewColumnsWithTypeWidening Streamen har avbrutits. Nya kolumner läggs till i schemat och ändringar av datatyper som stöds utökas. Ändringar av datatypen som inte stöds (till exempel int till string) läggs till i den räddade datakolumnen.

Exempelresultat

Följande tabell visar det här schemat och värdena för varje schemautvecklingsläge efter inmatning av den andra CSV-filen:

Begränsningar

  • Alternativet prefersDecimal kan inte anges till false när du använder addNewColumnsWithTypeWidening. När addNewColumnsWithTypeWidening specificeras är standardvärdet för prefersDecimaltrue.
  • date till timestampNTZ utökning stöds endast för Parquet-filer.