Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Du kan använda SQL-kommandot COPY INTO för att läsa in data från en filplats till en Delta-tabell.
COPY INTO är återförsökbar och idempotent – filer på källplatsen som redan har lästs in hoppas över vid senare körningar.
COPY INTO erbjuder följande funktioner:
- Konfigurera enkelt fil- eller mappfilter från molnlagring, inklusive volymer för S3, ADLS, ABFS, GCS och Unity Catalog.
- Stöd för flera källfilformat: CSV, JSON, XML, Avro, ORC, Parquet, text och binära filer.
- Exakt en gång (idempotent) filbearbetning som standard.
- Måltabellschemainferens, mappning, sammanslagning och utveckling.
Anteckning
För en mer skalbar och robust filinmatning rekommenderar Databricks att SQL-användare använder strömmande tabeller. Mer information finns i Strömmande tabeller.
Varning
COPY INTO respekterar arbetsyteinställningen för borttagningsvektorer. Om det är aktiverat kommer borttagningsvektorer att aktiveras i måltabellen när COPY INTO körs på ett SQL-lager eller beräkning som kör Databricks Runtime 14.0 eller senare. När borttagningsvektorer har aktiverats blockerar de frågor mot en tabell i Databricks Runtime 11.3 LTS och nedan. Se Borttagningsvektorer i Databricks och Automatisk aktivering av borttagningsvektorer.
Innan du börjar
En kontoadministratör måste följa stegen i Konfigurera dataåtkomst för inmatning för att konfigurera åtkomst till data i molnobjektlagring innan användarna kan läsa in data med .COPY INTO
Ladda data i en schemalös Delta Lake-tabell
I Databricks Runtime 11.3 LTS och senare kan du skapa tomma platshållar-deltatabeller så att schemat kan härledas under ett COPY INTO-kommando genom att ange mergeSchema till true i COPY_OPTIONS. I följande exempel används Wanderbricks-datauppsättningen . Ersätt <catalog>, <schema>och <volume> med en katalog, ett schema och en volym där du har CREATE TABLE behörigheter.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Den här SQL-instruktionen är idempotent. Det innebär att du kan schemalägga att den ska köras upprepade gånger och att den bara läser in nya data i deltatabellen.
Anteckning
Den tomma Delta-tabellen kan inte användas utanför COPY INTO.
INSERT INTO och MERGE INTO stöds inte för att skriva data till schemalösa Delta-tabeller. När data har infogats i tabellen med COPY INTOblir tabellen frågebar.
Se Skapa måltabeller för COPY INTO.
Ange schema och läs in data i en Delta Lake-tabell
I följande exempel skapas en Delta-tabell och SQL-kommandot används COPY INTO för att läsa in exempeldata från Wanderbricks-datauppsättningen till tabellen. Källfilerna är JSON-filer som lagras i en Unity Catalog-volym. Du kan köra exemplet Python, R, Scala eller SQL-kod från en notebook-fil som är kopplad till ett Azure Databricks kluster. Du kan också köra SQL-koden från en fråga som är associerad med ett SQL-lager i Databricks SQL. Ersätt <catalog>, <schema>och <volume> med en katalog, ett schema och en volym där du har CREATE TABLE behörigheter.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Om du vill rensa kör du följande kod för att ta bort exempeltabellen.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
Rensa metadatafiler
Du kan köra VACUUM för att städa upp orefererade metadatafiler som skapats av COPY INTO i Databricks Runtime 15.2 och senare.
Ytterligare resurser
- Läsa in data via COPY INTO med Unity Catalog-volymer eller externa platser
-
Vanliga datainläsningsmönster med hjälp av
COPY INTO.
- Databricks Runtime 7.x och senare:
COPY INTO