Partilhar via


Introdução ao uso do COPY INTO para carregar dados

Podes usar o COPY INTO comando SQL para carregar dados de uma localização de ficheiro para uma tabela Delta. COPY INTO é retentável e idempotente — ficheiros na localização de origem que já foram carregados são ignorados em execuções subsequentes.

COPY INTO Oferece estas capacidades:

  • Filtros de ficheiros ou pastas facilmente configuráveis a partir de armazenamento na cloud, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
  • Suporte para múltiplos formatos de ficheiro fonte: CSV, JSON, XML, Avro, ORC, Parquet, texto e ficheiros binários.
  • Processamento de ficheiros uma única vez (idempotente) por defeito.
  • Inferência, mapeamento, fusão e evolução do esquema da tabela alvo.

Nota

Para uma experiência de ingestão de ficheiros mais escalável e robusta, a Databricks recomenda que os utilizadores de SQL utilizem tabelas de streaming. Para mais informações, consulte Tabelas de streaming.

Aviso

COPY INTO Respeita a configuração do espaço de trabalho para vetores de eliminação. Se ativado, os vetores de eliminação são habilitados na tabela de destino quando COPY INTO é executado em um SQL warehouse ou uma computação a executar o Databricks Runtime 14.0 ou superior. Depois de os vetores de eliminação estarem ativados, bloqueiam consultas contra uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Veja Vetores de eliminação em Databricks e Ativar automaticamente vetores de eliminação.

Antes de começar

Um administrador de conta deve seguir os passos em Configurar acesso a dados para ingestão para configurar o acesso a dados em armazenamento de objetos na nuvem antes de os utilizadores poderem carregar dados usando COPY INTO.

Carregar dados numa tabela Delta Lake sem esquema

No Databricks Runtime 11.3 LTS e superiores, pode-se criar tabelas Delta de substituição vazias para que o esquema seja inferido durante o comando COPY INTO, definindo mergeSchema para true em COPY_OPTIONS. O exemplo seguinte utiliza o conjunto de dados Wanderbricks . Substitua <catalog>, <schema>, e <volume> por um catálogo, esquema e volume onde tiver CREATE TABLE permissões.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

linguagem de programação Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Esta instrução SQL é idempotente. Isto significa que pode agendá-lo para correr repetidamente, e ele só carregará novos dados na tua tabela Delta.

Nota

A tabela Delta vazia não é utilizável fora de COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Consulte para criar tabelas de destino para COPY INTO.

Definir o esquema e carregar os dados numa tabela Delta Lake

O exemplo seguinte cria uma tabela Delta e usa o COPY INTO comando SQL para carregar dados de amostra do conjunto de dados Wanderbricks para a tabela. Os ficheiros de origem são ficheiros JSON armazenados num volume do Unity Catalog. Pode executar o exemplo de código em Python, R, Scala ou SQL a partir de um caderno ligado a um cluster Azure Databricks. Também podes executar o código SQL a partir de uma consulta associada a um SQL warehouse no Databricks SQL. Substitua <catalog>, <schema>, e <volume> por um catálogo, esquema e volume onde tiver CREATE TABLE permissões.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

linguagem de programação Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Para limpar, execute o código seguinte para eliminar a tabela de exemplo.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

linguagem de programação Scala

spark.sql("DROP TABLE " + table_name)

Limpar arquivos de metadados

Você pode executar VACUUM para limpar arquivos de metadados não referenciados criados por COPY INTO no Databricks Runtime 15.2 e superior.

Recursos adicionais

  • Databricks Runtime 7.x e versões posteriores: COPY INTO