Kom igång med COPY INTO för att läsa in data

Du kan använda SQL-kommandot COPY INTO för att läsa in data från en filplats till en Delta-tabell. COPY INTO är återförsökbar och idempotent – filer på källplatsen som redan har lästs in hoppas över vid senare körningar.

COPY INTO erbjuder följande funktioner:

  • Konfigurera enkelt fil- eller mappfilter från molnlagring, inklusive volymer för S3, ADLS, ABFS, GCS och Unity Catalog.
  • Stöd för flera källfilformat: CSV, JSON, XML, Avro, ORC, Parquet, text och binära filer.
  • Exakt en gång (idempotent) filbearbetning som standard.
  • Måltabellschemainferens, mappning, sammanslagning och utveckling.

Anteckning

För en mer skalbar och robust filinmatning rekommenderar Databricks att SQL-användare använder strömmande tabeller. Mer information finns i Strömmande tabeller.

Varning

COPY INTO respekterar arbetsyteinställningen för borttagningsvektorer. Om det är aktiverat kommer borttagningsvektorer att aktiveras i måltabellen när COPY INTO körs på ett SQL-lager eller beräkning som kör Databricks Runtime 14.0 eller senare. När borttagningsvektorer har aktiverats blockerar de frågor mot en tabell i Databricks Runtime 11.3 LTS och nedan. Se Borttagningsvektorer i Databricks och Automatisk aktivering av borttagningsvektorer.

Innan du börjar

En kontoadministratör måste följa stegen i Konfigurera dataåtkomst för inmatning för att konfigurera åtkomst till data i molnobjektlagring innan användarna kan läsa in data med .COPY INTO

Ladda data i en schemalös Delta Lake-tabell

I Databricks Runtime 11.3 LTS och senare kan du skapa tomma platshållar-deltatabeller så att schemat kan härledas under ett COPY INTO-kommando genom att ange mergeSchema till true i COPY_OPTIONS. I följande exempel används Wanderbricks-datauppsättningen . Ersätt <catalog>, <schema>och <volume> med en katalog, ett schema och en volym där du har CREATE TABLE behörigheter.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Den här SQL-instruktionen är idempotent. Det innebär att du kan schemalägga att den ska köras upprepade gånger och att den bara läser in nya data i deltatabellen.

Anteckning

Den tomma Delta-tabellen kan inte användas utanför COPY INTO. INSERT INTO och MERGE INTO stöds inte för att skriva data till schemalösa Delta-tabeller. När data har infogats i tabellen med COPY INTOblir tabellen frågebar.

Se Skapa måltabeller för COPY INTO.

Ange schema och läs in data i en Delta Lake-tabell

I följande exempel skapas en Delta-tabell och SQL-kommandot används COPY INTO för att läsa in exempeldata från Wanderbricks-datauppsättningen till tabellen. Källfilerna är JSON-filer som lagras i en Unity Catalog-volym. Du kan köra exemplet Python, R, Scala eller SQL-kod från en notebook-fil som är kopplad till ett Azure Databricks kluster. Du kan också köra SQL-koden från en fråga som är associerad med ett SQL-lager i Databricks SQL. Ersätt <catalog>, <schema>och <volume> med en katalog, ett schema och en volym där du har CREATE TABLE behörigheter.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Om du vill rensa kör du följande kod för att ta bort exempeltabellen.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

Rensa metadatafiler

Du kan köra VACUUM för att städa upp orefererade metadatafiler som skapats av COPY INTO i Databricks Runtime 15.2 och senare.

Ytterligare resurser

  • Databricks Runtime 7.x och senare: COPY INTO