Compartilhar via


Tutorial: COPY INTO com o SPARK SQL

O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fontes de dados que contêm milhares de arquivos.

Neste tutorial, você usará o comando COPY INTO para carregar dados JSON de um volume do Catálogo do Unity em uma tabela Delta em seu workspace Azure Databricks. Use o conjunto de dados de exemplo do Wanderbricks como fonte de dados. Para casos de uso de ingestão mais avançados, consulte o que é o Carregador Automático?.

Requisitos

Etapa 1: Configurar seu ambiente

O código neste tutorial usa um volume do Catálogo do Unity para armazenar arquivos de origem JSON. Substitua <catalog> por um catálogo em que você tenha CREATE SCHEMA e CREATE VOLUME permissões. Se você não puder executar o código, entre em contato com o administrador do workspace.

Crie um notebook e anexe-o a um recurso de computação. Em seguida, execute o código a seguir para configurar um esquema e um volume para este tutorial.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Etapa 2: Gravar dados de exemplo no volume como JSON

O COPY INTO comando carrega dados de fontes baseadas em arquivo. Leia a partir da tabela de exemplo do Wanderbricksbookings e escreva um lote de registros como arquivos JSON em seu volume, simulando dados que chegam de um sistema externo.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Gravar arquivos em um volume requer Python. Em um fluxo de trabalho do mundo real, esses dados chegariam de um sistema externo.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Etapa 3: usar COPY INTO para carregar dados JSON de modo idempotente

Crie uma tabela Delta de destino antes de usar COPY INTO. Você não precisa fornecer nada além de um nome de tabela na sua CREATE TABLE declaração. Como essa ação é idempotente, o Databricks carrega os dados apenas uma vez, mesmo se você executar o código várias vezes.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Etapa 4: Visualizar o conteúdo da tabela

Verifique se a tabela contém 20 linhas do primeiro lote de dados de reservas do Wanderbricks e se o esquema foi inferido corretamente dos arquivos de origem JSON.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Etapa 5: Carregar mais dados e visualizar resultados

Você pode simular dados adicionais que chegam de um sistema externo gravando outro lote de registros e executando COPY INTO novamente. Execute o código a seguir para gravar um segundo lote de dados.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Gravar arquivos em um volume requer Python. Em um fluxo de trabalho do mundo real, esses dados chegariam de um sistema externo.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Em seguida, execute o COPY INTO comando da Etapa 3 novamente e visualize a tabela para confirmar os novos registros. Somente os novos arquivos são carregados.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Etapa 6: Tutorial de limpeza

Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los. Exclua o esquema, as tabelas, o volume e remova todos os dados.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Recursos adicionais