Zelfstudie: COPY INTO met Spark SQL

Databricks raadt u aan de COPY INTO opdracht te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten.

In deze zelfstudie gebruikt u de opdracht COPY INTO om JSON-gegevens van een Unity Catalog-volume te laden in een Delta-tabel in uw Azure Databricks-werkruimte. U gebruikt de Wanderbricks-voorbeeldgegevensset als gegevensbron. Zie Wat is Auto Loader? voor meer geavanceerde gebruiksvoorbeelden voor inslikken.

Eisen

Stap 1: Uw omgeving configureren

De code in deze zelfstudie maakt gebruik van een Unity Catalog-volume voor het opslaan van JSON-bronbestanden. Vervang <catalog> met een catalogus waar je CREATE SCHEMA en CREATE VOLUME machtigingen hebt. Als u de code niet kunt uitvoeren, neemt u contact op met de beheerder van de werkruimte.

Maak een notebook en koppel deze aan een rekenresource. Voer vervolgens de volgende code uit om een schema en volume voor deze tutorial in te stellen.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Stap 2: Voorbeeldgegevens naar het volume schrijven als JSON

Met de COPY INTO opdracht worden gegevens uit bestandsbronnen geladen. Lees uit de Wanderbricksbookings voorbeeldtabel en schrijf een batch gegevens als JSON-bestanden naar uw volume om gegevens van een extern systeem te simuleren.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Het schrijven van bestanden naar een volume vereist Python. In een echte werkstroom zouden deze gegevens afkomstig zijn van een extern systeem.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Stap 3: Gebruik COPY INTO om JSON-gegevens idempotent te laden

Maak een Delta-doeltabel voordat u COPY INTO gebruikt. U hoeft niets anders op te geven dan een tabelnaam in uw CREATE TABLE instructie. Omdat deze actie idempotent is, laadt Databricks de gegevens slechts één keer, zelfs als u de code meerdere keren uitvoert.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Stap 4: Een voorbeeld van de inhoud van de tabel bekijken

Controleer of de tabel 20 rijen bevat uit de eerste batch met Wanderbricks-boekingsgegevens en of het schema juist is afgeleid van de JSON-bronbestanden.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken

U kunt extra gegevens simuleren die afkomstig zijn van een extern systeem door een andere batch records te schrijven en opnieuw uit te voeren COPY INTO . Voer de volgende code uit om een tweede batch met gegevens te schrijven.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Het schrijven van bestanden naar een volume vereist Python. In een echte werkstroom zouden deze gegevens afkomstig zijn van een extern systeem.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Voer vervolgens de COPY INTO opdracht opnieuw uit vanuit stap 3 en bekijk een voorbeeld van de tabel om de nieuwe records te bevestigen. Alleen de nieuwe bestanden worden geladen.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Stap 6: Opschonen van de handleiding

Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden. Verwijder het schema, de tabellen en het volume en verwijder alle gegevens.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Aanvullende informatiebronnen