Compartilhar via


Comece a usar COPY INTO para carregar dados

Você pode usar o COPY INTO comando SQL para carregar dados de um local de arquivo em uma tabela Delta. COPY INTO é retriável e idempotente — os arquivos no local de origem que já foram carregados são ignorados em execuções subsequentes.

COPY INTO oferece estes recursos:

  • Filtros facilmente configuráveis de arquivos ou pastas a partir do armazenamento em nuvem, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
  • Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários.
  • Processamento de arquivo exatamente uma vez (idempotente) por padrão.
  • Inferência de esquema de tabela de destino, mapeamento, mesclagem e evolução.

Observação

Para uma experiência de ingestão de arquivos mais escalonável e robusta, o Databricks recomenda que os usuários do SQL usem tabelas de streaming. Para obter mais informações, consulte As tabelas de streaming.

Aviso

COPY INTO respeita a configuração do workspace para vetores de exclusão. Se habilitado, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO for executado em um warehouse SQL ou em uma computação que esteja rodando o Databricks Runtime 14.0 ou uma versão superior. Depois que os vetores de exclusão são habilitados, eles bloqueiam consultas em uma tabela no Databricks Runtime 11.3 LTS e abaixo. Consulte vetores de exclusão no Databricks e habilite automaticamente vetores de exclusão.

Antes de começar

Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos de nuvem antes que os usuários possam carregar dados usando COPY INTO.

Carregar dados em uma tabela Delta Lake sem esquema

No Databricks Runtime 11.3 LTS e posteriores, você pode criar tabelas Delta de marcador vazias para que o esquema seja inferido durante um comando COPY INTO definindo mergeSchema para true em COPY_OPTIONS. O exemplo a seguir usa o conjunto de dados do Wanderbricks . Substitua <catalog>, <schema>e <volume> por um catálogo, esquema e volume em que você tenha CREATE TABLE permissões.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Scala (linguagem de programação)

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Essa instrução SQL é idempotente. Isso significa que você pode agendá-lo para ser executado repetidamente e ele só carregará novos dados em sua tabela Delta.

Observação

A tabela Delta vazia não é utilizável lá fora COPY INTO. INSERT INTO e MERGE INTO não há suporte para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Veja Criar tabelas de destino para COPY INTO.

Definir o esquema e carregar dados em uma tabela do Delta Lake

O exemplo a seguir cria uma tabela Delta e usa o COPY INTO comando SQL para carregar dados de exemplo do conjunto de dados do Wanderbricks na tabela. Os arquivos de origem são arquivos JSON armazenados em um volume do Catálogo do Unity. Você pode executar o exemplo de código Python, R, Scala ou SQL de um notebook anexado a um cluster Azure Databricks. Você também pode executar o código SQL de uma consulta associada a um SQL Warehouse no Databricks SQL. Substitua <catalog>, <schema>e <volume> por um catálogo, esquema e volume em que você tenha CREATE TABLE permissões.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Scala (linguagem de programação)

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Para limpar, execute o código a seguir para excluir a tabela de exemplo.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala (linguagem de programação)

spark.sql("DROP TABLE " + table_name)

Limpar arquivos de metadados

Você pode executar VACUUM para limpar arquivos de metadados não referenciados que foram criados pelo COPY INTO no Databricks Runtime 15.2 e superior.

Recursos adicionais

  • Databricks Runtime 7.x e versões posteriores: COPY INTO