Partilhar via


Tutorial: COPY INTO com o Spark SQL

O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incrementais e em massa para fontes de dados que contenham milhares de arquivos.

Neste tutorial, usa o comando COPY INTO para carregar dados JSON de um volume do Unity Catalog numa tabela Delta no seu espaço de trabalho Azure Databricks. Usas o conjunto de dados de exemplo do Wanderbricks como fonte de dados. Para casos de utilização mais avançados de ingestão, veja O que é o Auto Loader?.

Requerimentos

Etapa 1: Configurar seu ambiente

O código neste tutorial utiliza um volume do Unity Catalog para armazenar ficheiros fonte JSON. Substitui <catalog> por um catálogo onde tens CREATE SCHEMA e CREATE VOLUME as permissões. Se não conseguir executar o código, contacte o administrador do seu espaço de trabalho.

Crie um caderno e anexe-o a um recurso de computação. Depois, execute o código seguinte para configurar um esquema e um volume para este tutorial.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Passo 2: Escrever dados de exemplo no volume como JSON

O COPY INTO comando carrega dados de fontes baseadas em ficheiros. Leia da tabela de amostra do Wanderbricksbookings e grave um lote de registos como ficheiros JSON no seu volume, simulando a chegada de dados de um sistema externo.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Escrever ficheiros num volume requer Python. Num fluxo de trabalho do mundo real, esses dados viriam de um sistema externo.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Etapa 3: Usar COPY INTO para carregar dados JSON idempotentes

Crie uma tabela Delta alvo antes de usar COPY INTO. Não precisa de fornecer nada além do nome da tabela na sua CREATE TABLE declaração. Como esta ação é idempotente, o Databricks carrega os dados apenas uma vez, mesmo que execute o código várias vezes.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Passo 4: Pré-visualizar o conteúdo da tabela

Verifique se a tabela contém 20 linhas do primeiro lote de dados de reservas Wanderbricks e se o esquema foi corretamente inferido a partir dos ficheiros de origem JSON.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Etapa 5: carregar mais dados e visualizar resultados

Pode simular dados adicionais a chegar de um sistema externo escrevendo outro lote de registos e executando COPY INTO novamente. Execute o código seguinte para escrever um segundo lote de dados.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Escrever ficheiros num volume requer Python. Num fluxo de trabalho do mundo real, esses dados viriam de um sistema externo.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Depois executa novamente o COPY INTO comando do Passo 3 e visualiza a tabela para confirmar os novos registos. Só os ficheiros novos são carregados.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Passo 6: Tutorial de limpeza

Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los. Elimina o esquema, as tabelas e o volume, e remove todos os dados.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Recursos adicionais