Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Databricks recommande d’utiliser la COPY INTO commande pour le chargement incrémentiel et en bloc de données pour les sources de données qui contiennent des milliers de fichiers.
Dans ce tutoriel, vous utilisez la commande COPY INTO pour charger des données JSON à partir d’un volume de catalogue Unity dans une table Delta dans votre espace de travail Azure Databricks. Vous utilisez l’exemple de jeu de données Wanderbricks comme source de données. Pour plus d’informations sur les scénarios d'ingestion plus avancés, consultez Qu’est-ce que Auto Loader ?.
Spécifications
- Accès à une ressource de calcul. Voir Calculer.
- Un espace de travail avec catalogue Unity avec des autorisations pour créer des schémas et des volumes dans un catalogue. Consultez Se connecter au stockage d’objets cloud à l’aide du catalogue Unity.
Étape 1 : Configurer votre environnement
Le code de ce tutoriel utilise un volume de catalogue Unity pour stocker des fichiers sources JSON. Remplacez <catalog> par un catalogue où vous disposez des autorisations CREATE SCHEMA et CREATE VOLUME. Si vous ne pouvez pas exécuter le code, contactez l’administrateur de votre espace de travail.
Créez un notebook et attachez-le à une ressource de calcul. Exécutez ensuite le code suivant pour configurer un schéma et un volume pour ce didacticiel.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Étape 2 : Écrire des exemples de données dans le volume en tant que JSON
La COPY INTO commande charge des données à partir de sources basées sur des fichiers. Lisez l’exemple de table Wanderbricksbookings et écrivez un lot d’enregistrements en tant que fichiers JSON dans votre volume, en simulant les données provenant d’un système externe.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
L’écriture de fichiers dans un volume nécessite Python. Dans un flux de travail réel, ces données arrivent d’un système externe.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Étape 3 : Utiliser COPY INTO pour charger des données JSON idempotentes
Créez une table Delta cible avant d’utiliser COPY INTO. Vous n’avez pas besoin de fournir autre chose qu’un nom de table dans votre CREATE TABLE instruction. Étant donné que cette action est idempotente, Databricks charge les données une seule fois, même si vous exécutez le code plusieurs fois.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Étape 4 : Afficher un aperçu du contenu de votre table
Vérifiez que la table contient 20 lignes du premier lot de données de réservation Wanderbricks et que le schéma a été déduit correctement des fichiers sources JSON.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Étape 5 : Charger plus de données et afficher un aperçu des résultats
Vous pouvez simuler des données supplémentaires provenant d’un système externe en écrivant un autre lot d’enregistrements et en réécutant COPY INTO . Exécutez le code suivant pour écrire un deuxième lot de données.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
L’écriture de fichiers dans un volume nécessite Python. Dans un flux de travail réel, ces données arrivent d’un système externe.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Exécutez ensuite la commande à partir de l’étape 3COPY INTO et affichez un aperçu de la table pour confirmer les nouveaux enregistrements. Seuls les nouveaux fichiers sont chargés.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Étape 6 : Tutoriel de nettoyage
Lorsque vous avez terminé ce didacticiel, vous pouvez nettoyer les ressources associées si vous ne souhaitez plus les conserver. Supprimez le schéma, les tables et le volume, puis supprimez toutes les données.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;