Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Databricks raadt u aan de COPY INTO opdracht te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten.
In deze zelfstudie gebruikt u de opdracht COPY INTO om JSON-gegevens van een Unity Catalog-volume te laden in een Delta-tabel in uw Azure Databricks-werkruimte. U gebruikt de Wanderbricks-voorbeeldgegevensset als gegevensbron. Zie Wat is Auto Loader? voor meer geavanceerde gebruiksvoorbeelden voor inslikken.
Eisen
- Toegang tot een rekenresource. Zie Compute.
- Een werkruimte met Unity Catalog-functionaliteit met machtigingen voor het maken van schema's en volumes in een catalogus. Zie Verbinding maken met cloudobjectopslag met behulp van Unity Catalog.
Stap 1: Uw omgeving configureren
De code in deze zelfstudie maakt gebruik van een Unity Catalog-volume voor het opslaan van JSON-bronbestanden. Vervang <catalog> met een catalogus waar je CREATE SCHEMA en CREATE VOLUME machtigingen hebt. Als u de code niet kunt uitvoeren, neemt u contact op met de beheerder van de werkruimte.
Maak een notebook en koppel deze aan een rekenresource. Voer vervolgens de volgende code uit om een schema en volume voor deze tutorial in te stellen.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Stap 2: Voorbeeldgegevens naar het volume schrijven als JSON
Met de COPY INTO opdracht worden gegevens uit bestandsbronnen geladen. Lees uit de Wanderbricksbookings voorbeeldtabel en schrijf een batch gegevens als JSON-bestanden naar uw volume om gegevens van een extern systeem te simuleren.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Het schrijven van bestanden naar een volume vereist Python. In een echte werkstroom zouden deze gegevens afkomstig zijn van een extern systeem.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Stap 3: Gebruik COPY INTO om JSON-gegevens idempotent te laden
Maak een Delta-doeltabel voordat u COPY INTO gebruikt. U hoeft niets anders op te geven dan een tabelnaam in uw CREATE TABLE instructie. Omdat deze actie idempotent is, laadt Databricks de gegevens slechts één keer, zelfs als u de code meerdere keren uitvoert.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Stap 4: Een voorbeeld van de inhoud van de tabel bekijken
Controleer of de tabel 20 rijen bevat uit de eerste batch met Wanderbricks-boekingsgegevens en of het schema juist is afgeleid van de JSON-bronbestanden.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken
U kunt extra gegevens simuleren die afkomstig zijn van een extern systeem door een andere batch records te schrijven en opnieuw uit te voeren COPY INTO . Voer de volgende code uit om een tweede batch met gegevens te schrijven.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Het schrijven van bestanden naar een volume vereist Python. In een echte werkstroom zouden deze gegevens afkomstig zijn van een extern systeem.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Voer vervolgens de COPY INTO opdracht opnieuw uit vanuit stap 3 en bekijk een voorbeeld van de tabel om de nieuwe records te bevestigen. Alleen de nieuwe bestanden worden geladen.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Stap 6: Opschonen van de handleiding
Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden. Verwijder het schema, de tabellen en het volume en verwijder alle gegevens.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;