Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Podes usar o COPY INTO comando SQL para carregar dados de uma localização de ficheiro para uma tabela Delta.
COPY INTO é retentável e idempotente — ficheiros na localização de origem que já foram carregados são ignorados em execuções subsequentes.
COPY INTO Oferece estas capacidades:
- Filtros de ficheiros ou pastas facilmente configuráveis a partir de armazenamento na cloud, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
- Suporte para múltiplos formatos de ficheiro fonte: CSV, JSON, XML, Avro, ORC, Parquet, texto e ficheiros binários.
- Processamento de ficheiros uma única vez (idempotente) por defeito.
- Inferência, mapeamento, fusão e evolução do esquema da tabela alvo.
Nota
Para uma experiência de ingestão de ficheiros mais escalável e robusta, a Databricks recomenda que os utilizadores de SQL utilizem tabelas de streaming. Para mais informações, consulte Tabelas de streaming.
Aviso
COPY INTO Respeita a configuração do espaço de trabalho para vetores de eliminação. Se ativado, os vetores de eliminação são habilitados na tabela de destino quando COPY INTO é executado em um SQL warehouse ou uma computação a executar o Databricks Runtime 14.0 ou superior. Depois de os vetores de eliminação estarem ativados, bloqueiam consultas contra uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Veja Vetores de eliminação em Databricks e Ativar automaticamente vetores de eliminação.
Antes de começar
Um administrador de conta deve seguir os passos em Configurar acesso a dados para ingestão para configurar o acesso a dados em armazenamento de objetos na nuvem antes de os utilizadores poderem carregar dados usando COPY INTO.
Carregar dados numa tabela Delta Lake sem esquema
No Databricks Runtime 11.3 LTS e superiores, pode-se criar tabelas Delta de substituição vazias para que o esquema seja inferido durante o comando COPY INTO, definindo mergeSchema para true em COPY_OPTIONS. O exemplo seguinte utiliza o conjunto de dados Wanderbricks . Substitua <catalog>, <schema>, e <volume> por um catálogo, esquema e volume onde tiver CREATE TABLE permissões.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
linguagem de programação Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Esta instrução SQL é idempotente. Isto significa que pode agendá-lo para correr repetidamente, e ele só carregará novos dados na tua tabela Delta.
Nota
A tabela Delta vazia não é utilizável fora de COPY INTO.
INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.
Consulte para criar tabelas de destino para COPY INTO.
Definir o esquema e carregar os dados numa tabela Delta Lake
O exemplo seguinte cria uma tabela Delta e usa o COPY INTO comando SQL para carregar dados de amostra do conjunto de dados Wanderbricks para a tabela. Os ficheiros de origem são ficheiros JSON armazenados num volume do Unity Catalog. Pode executar o exemplo de código em Python, R, Scala ou SQL a partir de um caderno ligado a um cluster Azure Databricks. Também podes executar o código SQL a partir de uma consulta associada a um SQL warehouse no Databricks SQL. Substitua <catalog>, <schema>, e <volume> por um catálogo, esquema e volume onde tiver CREATE TABLE permissões.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
linguagem de programação Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Para limpar, execute o código seguinte para eliminar a tabela de exemplo.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
linguagem de programação Scala
spark.sql("DROP TABLE " + table_name)
Limpar arquivos de metadados
Você pode executar VACUUM para limpar arquivos de metadados não referenciados criados por COPY INTO no Databricks Runtime 15.2 e superior.
Recursos adicionais
- Carregar dados usando COPY INTO, volumes do Catálogo Unity ou locais externos
-
Padrões comuns de carregamento de dados usando
COPY INTO.
- Databricks Runtime 7.x e versões posteriores:
COPY INTO