Tutoriel : COPY INTO avec Spark SQL

Databricks recommande d’utiliser la COPY INTO commande pour le chargement incrémentiel et en bloc de données pour les sources de données qui contiennent des milliers de fichiers.

Dans ce tutoriel, vous utilisez la commande COPY INTO pour charger des données JSON à partir d’un volume de catalogue Unity dans une table Delta dans votre espace de travail Azure Databricks. Vous utilisez l’exemple de jeu de données Wanderbricks comme source de données. Pour plus d’informations sur les scénarios d'ingestion plus avancés, consultez Qu’est-ce que Auto Loader ?.

Spécifications

Étape 1 : Configurer votre environnement

Le code de ce tutoriel utilise un volume de catalogue Unity pour stocker des fichiers sources JSON. Remplacez <catalog> par un catalogue où vous disposez des autorisations CREATE SCHEMA et CREATE VOLUME. Si vous ne pouvez pas exécuter le code, contactez l’administrateur de votre espace de travail.

Créez un notebook et attachez-le à une ressource de calcul. Exécutez ensuite le code suivant pour configurer un schéma et un volume pour ce didacticiel.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Étape 2 : Écrire des exemples de données dans le volume en tant que JSON

La COPY INTO commande charge des données à partir de sources basées sur des fichiers. Lisez l’exemple de table Wanderbricksbookings et écrivez un lot d’enregistrements en tant que fichiers JSON dans votre volume, en simulant les données provenant d’un système externe.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

L’écriture de fichiers dans un volume nécessite Python. Dans un flux de travail réel, ces données arrivent d’un système externe.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Étape 3 : Utiliser COPY INTO pour charger des données JSON idempotentes

Créez une table Delta cible avant d’utiliser COPY INTO. Vous n’avez pas besoin de fournir autre chose qu’un nom de table dans votre CREATE TABLE instruction. Étant donné que cette action est idempotente, Databricks charge les données une seule fois, même si vous exécutez le code plusieurs fois.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Étape 4 : Afficher un aperçu du contenu de votre table

Vérifiez que la table contient 20 lignes du premier lot de données de réservation Wanderbricks et que le schéma a été déduit correctement des fichiers sources JSON.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Étape 5 : Charger plus de données et afficher un aperçu des résultats

Vous pouvez simuler des données supplémentaires provenant d’un système externe en écrivant un autre lot d’enregistrements et en réécutant COPY INTO . Exécutez le code suivant pour écrire un deuxième lot de données.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

L’écriture de fichiers dans un volume nécessite Python. Dans un flux de travail réel, ces données arrivent d’un système externe.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Exécutez ensuite la commande à partir de l’étape 3COPY INTO et affichez un aperçu de la table pour confirmer les nouveaux enregistrements. Seuls les nouveaux fichiers sont chargés.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Étape 6 : Tutoriel de nettoyage

Lorsque vous avez terminé ce didacticiel, vous pouvez nettoyer les ressources associées si vous ne souhaitez plus les conserver. Supprimez le schéma, les tables et le volume, puis supprimez toutes les données.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Ressources supplémentaires