Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fontes de dados que contêm milhares de arquivos.
Neste tutorial, você usará o comando COPY INTO para carregar dados JSON de um volume do Catálogo do Unity em uma tabela Delta em seu workspace Azure Databricks. Use o conjunto de dados de exemplo do Wanderbricks como fonte de dados. Para casos de uso de ingestão mais avançados, consulte o que é o Carregador Automático?.
Requisitos
- Acesso a um recurso de computação. Consulte Compute.
- Um workspace habilitado para o Unity Catalog com permissões para criar esquemas e volumes dentro de um catálogo. Consulte Conectar-se ao armazenamento de objetos de nuvem usando o Catálogo do Unity.
Etapa 1: Configurar seu ambiente
O código neste tutorial usa um volume do Catálogo do Unity para armazenar arquivos de origem JSON. Substitua <catalog> por um catálogo em que você tenha CREATE SCHEMA e CREATE VOLUME permissões. Se você não puder executar o código, entre em contato com o administrador do workspace.
Crie um notebook e anexe-o a um recurso de computação. Em seguida, execute o código a seguir para configurar um esquema e um volume para este tutorial.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Etapa 2: Gravar dados de exemplo no volume como JSON
O COPY INTO comando carrega dados de fontes baseadas em arquivo. Leia a partir da tabela de exemplo do Wanderbricksbookings e escreva um lote de registros como arquivos JSON em seu volume, simulando dados que chegam de um sistema externo.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Gravar arquivos em um volume requer Python. Em um fluxo de trabalho do mundo real, esses dados chegariam de um sistema externo.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Etapa 3: usar COPY INTO para carregar dados JSON de modo idempotente
Crie uma tabela Delta de destino antes de usar COPY INTO. Você não precisa fornecer nada além de um nome de tabela na sua CREATE TABLE declaração. Como essa ação é idempotente, o Databricks carrega os dados apenas uma vez, mesmo se você executar o código várias vezes.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Etapa 4: Visualizar o conteúdo da tabela
Verifique se a tabela contém 20 linhas do primeiro lote de dados de reservas do Wanderbricks e se o esquema foi inferido corretamente dos arquivos de origem JSON.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Etapa 5: Carregar mais dados e visualizar resultados
Você pode simular dados adicionais que chegam de um sistema externo gravando outro lote de registros e executando COPY INTO novamente. Execute o código a seguir para gravar um segundo lote de dados.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Gravar arquivos em um volume requer Python. Em um fluxo de trabalho do mundo real, esses dados chegariam de um sistema externo.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Em seguida, execute o COPY INTO comando da Etapa 3 novamente e visualize a tabela para confirmar os novos registros. Somente os novos arquivos são carregados.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Etapa 6: Tutorial de limpeza
Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los. Exclua o esquema, as tabelas, o volume e remova todos os dados.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;