Commencez à utiliser COPY INTO pour charger des données

Vous pouvez utiliser la COPY INTO commande SQL pour charger des données à partir d’un emplacement de fichier dans une table Delta. COPY INTO est retriable et idempotent : les fichiers dans l’emplacement source qui ont déjà été chargés sont ignorés lors des exécutions suivantes.

COPY INTO offre ces fonctionnalités :

  • Filtres facilement configurables pour les fichiers ou dossiers à partir du stockage cloud, notamment S3, ADLS, ABFS, GCS et les volumes du Unity Catalog.
  • Prise en charge de plusieurs formats de fichiers sources : CSV, JSON, XML, Avro, ORC, Parquet, texte et fichiers binaires.
  • Traitement de fichiers exactement une fois (idempotent) par défaut.
  • Inférence de schéma de table cible, mappage, fusion et évolution.

Remarque

Pour une expérience d’ingestion de fichiers plus évolutive et plus robuste, Databricks recommande aux utilisateurs SQL d’utiliser des tables de diffusion en continu. Pour plus d’informations, consultez les tables de diffusion en continu.

Avertissement

COPY INTO respecte le paramètre d’espace de travail pour les vecteurs de suppression. Si cette option est activée, les vecteurs de suppression sont activés sur la table cible lorsque COPY INTO s’exécute sur un entrepôt SQL ou que du calcul s’exécute sur Databricks Runtime 14.0 ou une version ultérieure. Une fois les vecteurs de suppression activés, ils bloquent les requêtes sur une table dans Databricks Runtime 11.3 LTS et ci-dessous. Consultez les vecteurs de suppression dans Databricks et les vecteurs de suppression à activation automatique.

Avant de commencer

Un administrateur de compte doit suivre les étapes de configuration de l’accès aux données pour l’ingestion afin de configurer l’accès aux données dans le stockage d’objets cloud avant que les utilisateurs puissent charger des données à l’aide COPY INTOde .

Charger des données dans une table Delta Lake sans schéma

Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez créer des tables Delta de type placeholder vides afin que le schéma soit inféré lors d'une COPY INTO commande en définissant mergeSchema à true dans COPY_OPTIONS. L’exemple suivant utilise le jeu de données Wanderbricks . Remplacez <catalog>, <schema> et <volume> par un catalogue, un schéma et un volume où vous disposez des autorisations CREATE TABLE.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Langage de programmation Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Cette instruction SQL est idempotente. Cela signifie que vous pouvez planifier son exécution à plusieurs reprises, et qu’il charge uniquement de nouvelles données dans votre table Delta.

Remarque

La table Delta vide n’est pas utilisable en dehors COPY INTO. INSERT INTO et MERGE INTO ne sont pas pris en charge pour écrire des données dans des tables Delta sans schéma. Une fois les données insérées dans la table avec COPY INTO, la table devient interrogeable.

Consultez la section Créer des tables cibles pour COPY INTO.

Définir le schéma et charger des données dans une table Delta Lake

L’exemple suivant crée une table Delta et utilise la COPY INTO commande SQL pour charger des exemples de données à partir du jeu de données Wanderbricks dans la table. Les fichiers sources sont des fichiers JSON stockés dans un volume de catalogue Unity. Vous pouvez exécuter l’exemple de code Python, R, Scala ou SQL à partir d’un notebook attaché à un cluster Azure Databricks. Vous pouvez également exécuter le code SQL à partir d’une requête associée à un entrepôt SQL dans Databricks SQL. Remplacez <catalog>, <schema> et <volume> par un catalogue, un schéma et un volume où vous disposez des autorisations CREATE TABLE.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Langage de programmation Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Pour nettoyer, exécutez le code suivant pour supprimer l’exemple de table.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Langage de programmation Scala

spark.sql("DROP TABLE " + table_name)

Nettoyer les fichiers de métadonnées

Vous pouvez exécuter VACUUM pour nettoyer les fichiers de métadonnées non référencés créés COPY INTO dans Databricks Runtime 15.2 et versions ultérieures.

Ressources additionnelles

  • Databricks Runtime 7.x et ultérieur : COPY INTO