Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incrementais e em massa para fontes de dados que contenham milhares de arquivos.
Neste tutorial, usa o comando COPY INTO para carregar dados JSON de um volume do Unity Catalog numa tabela Delta no seu espaço de trabalho Azure Databricks. Usas o conjunto de dados de exemplo do Wanderbricks como fonte de dados. Para casos de utilização mais avançados de ingestão, veja O que é o Auto Loader?.
Requerimentos
- Acesso a um recurso de computação. Consulte Compute.
- Um espaço de trabalho ativado pelo Unity Catalog com permissões para criar esquemas e volumes num catálogo. Consulte Conectar-se ao armazenamento de objetos na nuvem usando o Unity Catalog.
Etapa 1: Configurar seu ambiente
O código neste tutorial utiliza um volume do Unity Catalog para armazenar ficheiros fonte JSON. Substitui <catalog> por um catálogo onde tens CREATE SCHEMA e CREATE VOLUME as permissões. Se não conseguir executar o código, contacte o administrador do seu espaço de trabalho.
Crie um caderno e anexe-o a um recurso de computação. Depois, execute o código seguinte para configurar um esquema e um volume para este tutorial.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Passo 2: Escrever dados de exemplo no volume como JSON
O COPY INTO comando carrega dados de fontes baseadas em ficheiros. Leia da tabela de amostra do Wanderbricksbookings e grave um lote de registos como ficheiros JSON no seu volume, simulando a chegada de dados de um sistema externo.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Escrever ficheiros num volume requer Python. Num fluxo de trabalho do mundo real, esses dados viriam de um sistema externo.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Etapa 3: Usar COPY INTO para carregar dados JSON idempotentes
Crie uma tabela Delta alvo antes de usar COPY INTO. Não precisa de fornecer nada além do nome da tabela na sua CREATE TABLE declaração. Como esta ação é idempotente, o Databricks carrega os dados apenas uma vez, mesmo que execute o código várias vezes.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Passo 4: Pré-visualizar o conteúdo da tabela
Verifique se a tabela contém 20 linhas do primeiro lote de dados de reservas Wanderbricks e se o esquema foi corretamente inferido a partir dos ficheiros de origem JSON.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Etapa 5: carregar mais dados e visualizar resultados
Pode simular dados adicionais a chegar de um sistema externo escrevendo outro lote de registos e executando COPY INTO novamente. Execute o código seguinte para escrever um segundo lote de dados.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Escrever ficheiros num volume requer Python. Num fluxo de trabalho do mundo real, esses dados viriam de um sistema externo.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Depois executa novamente o COPY INTO comando do Passo 3 e visualiza a tabela para confirmar os novos registos. Só os ficheiros novos são carregados.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Passo 6: Tutorial de limpeza
Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los. Elimina o esquema, as tabelas e o volume, e remove todos os dados.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;