Aan de slag met COPY INTO om gegevens te laden

U kunt de COPY INTO SQL-opdracht gebruiken om gegevens van een bestandslocatie in een Delta-tabel te laden. COPY INTO is herhaalbaar en idempotent — bestanden op de bronlocatie die al zijn geladen, worden overgeslagen bij volgende uitvoeringen.

COPY INTO biedt deze mogelijkheden:

  • Eenvoudig configureerbare bestands- of mapfilters uit cloudopslag, waaronder S3-, ADLS-, ABFS-, GCS- en Unity Catalog-volumes.
  • Ondersteuning voor meerdere bronbestandsindelingen: CSV-, JSON-, XML-, Avro-, ORC-, Parquet-, tekst- en binaire bestanden.
  • Exactly-once (idempotente) bestandsverwerking als standaardinstelling.
  • Schemadeductie van doeltabellen, toewijzing, samenvoegen en evolutie.

Notitie

Voor een meer schaalbare en robuuste ervaring voor bestandsopname raadt Databricks aan dat SQL-gebruikers streamingtabellen gebruiken. Zie Streaming-tabellen voor meer informatie.

Waarschuwing

COPY INTO respecteert de werkruimte-instelling voor verwijderingsvectoren. Indien ingeschakeld, worden verwijderingsvectoren ingeschakeld in de doeltabel wanneer COPY INTO wordt uitgevoerd op een SQL-warehouse of rekenkracht waarop Databricks Runtime 14.0 of hoger wordt uitgevoerd. Nadat verwijderingsvectoren zijn ingeschakeld, blokkeren ze query's voor een tabel in Databricks Runtime 11.3 LTS en lager. Zie verwijderingsvectoren in Databricks en verwijdervectoren automatisch inschakelen.

Voordat u begint

Een accountbeheerder moet de stappen volgen in Gegevenstoegang configureren voor opname om de toegang tot gegevens in de opslag van cloudobjecten te configureren voordat gebruikers gegevens kunnen laden met behulp van COPY INTO.

Gegevens laden in een schemaloze Delta Lake-tabel

In Databricks Runtime 11.3 LTS en hoger kunt u lege placeholder Delta-tabellen maken, zodat het schema tijdens een COPY INTO-opdracht wordt afgeleid door mergeSchema in true op COPY_OPTIONS te zetten. In het volgende voorbeeld wordt de Wanderbricks-gegevensset gebruikt. Vervang <catalog>, <schema>, en <volume> door een catalogus, schema en volume waarvoor u machtigingen hebt CREATE TABLE.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Deze SQL-instructie is idempotent. Dit betekent dat u deze herhaaldelijk kunt plannen en dat er alleen nieuwe gegevens in uw Delta-tabel worden geladen.

Notitie

De lege Delta-tabel is niet bruikbaar buiten COPY INTO. INSERT INTO en MERGE INTO worden niet ondersteund om gegevens naar schemaloze Delta-tabellen te schrijven. Nadat gegevens met COPY INTOin de tabel zijn ingevoegd, kan de tabel query's uitvoeren.

Zie Doeltabellen maken voor COPY INTO.

Schema instellen en gegevens laden in een Delta Lake-tabel

In het volgende voorbeeld wordt een Delta-tabel gemaakt en wordt de SQL-opdracht COPY INTO gebruikt om voorbeeldgegevens uit de Wanderbricks-gegevensset in de tabel te laden. De bronbestanden zijn JSON-bestanden die zijn opgeslagen in een Unity Catalog-volume. U kunt het voorbeeld uitvoeren Python, R, Scala of SQL-code vanuit een notebook dat is gekoppeld aan een Azure Databricks-cluster. U kunt de SQL-code ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL. Vervang <catalog>, <schema>, en <volume> door een catalogus, schema en volume waarvoor u machtigingen hebt CREATE TABLE.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Voer de volgende code uit om de voorbeeldtabel te verwijderen om op te schonen.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

Metagegevensbestanden opschonen

U kunt VACUUM gebruiken om niet-verwezen metagegevensbestanden die zijn aangemaakt door COPY INTO in Databricks Runtime 15.2 en hoger, op te schonen.

Aanvullende bronnen