COPY INTO SQL コマンドを使用して、ファイルの場所から Delta テーブルにデータを読み込むことができます。
COPY INTO はリトライ可能で、べき等性を持ちます。ソースロケーションにある既に読み込まれているファイルは、後続の実行時にスキップされます。
COPY INTO には、次の機能が用意されています。
- S3、ADLS、ABFS、GCS、Unity カタログ ボリュームなど、クラウド ストレージから簡単に構成できるファイルまたはフォルダー フィルター。
- CSV、JSON、XML、 Avro、 ORC、 Parquet、テキスト、バイナリ ファイルなど、複数のソース ファイル形式のサポート。
- 既定では、べき等性を持つファイルの1回限りの処理です。
- ターゲット テーブル スキーマの推論、マッピング、マージ、および進化。
注
よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks では SQL ユーザーがストリーミング テーブルを使用することをお勧めします。 詳細については、「 ストリーミング テーブル」を参照してください。
警告
COPY INTO は、削除ベクトルのワークスペース設定を考慮します。 有効にした場合、Databricks Runtime 14.0 以降を実行しているコンピューティングまたは SQL ウェアハウスで COPY INTO が実行されている場合、ターゲット テーブルで削除ベクトルが有効になります。 削除ベクターを有効にすると、Databricks Runtime 11.3 LTS 以降のテーブルに対するクエリがブロックされます。
Databricks の削除ベクトルと自動有効化削除ベクトルを参照してください。
開始する前に
ユーザーがを使用してデータを読み込む前に、アカウント管理者がCOPY INTOを構成してクラウド オブジェクト ストレージ内のデータへのアクセスを構成する手順に従う必要があります。
スキーマがないDelta Lakeテーブルにデータを読み込む
Databricks Runtime 11.3 LTS 以降では、COPY INTOでmergeSchemaにtrueを設定することで、COPY_OPTIONS コマンド中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。 次の例では、 Wanderbricks データ セットを使用します。
<catalog>、<schema>、および<volume>を、CREATE TABLEアクセス許可を持つカタログ、スキーマ、ボリュームに置き換えます。
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
スカラ (プログラミング言語)
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
この SQL ステートメントは冪等性があります。 つまり、繰り返し実行するようにスケジュールすることができ、Delta テーブルに新しいデータのみが読み込まれます。
注
空の Delta テーブルは、 COPY INTO外では使用できません。
INSERT INTO および MERGE INTO では、スキーマレス Delta テーブルへのデータの書き込みがサポートされていません。 データが COPY INTO でテーブルに挿入されると、テーブルはクエリを実行することが可能になります。
COPY INTO
のターゲット テーブルの作成を参照してください。
スキーマを設定し、Delta Lake テーブルにデータを読み込む
次の例では、Delta テーブルを作成し、 COPY INTO SQL コマンドを使用して Wanderbricks データ セットからテーブルにサンプル データを読み込みます。 ソース ファイルは、Unity カタログ ボリュームに格納されている JSON ファイルです。 Azure Databricks クラスターに接続されているノートブックから、Python、R、Scala、または SQL コードの例を実行できます。 Databricks SQL の SQL ウェアハウスに関連付けられているクエリから SQL コードを実行することもできます。
<catalog>、<schema>、および<volume>を、CREATE TABLEアクセス許可を持つカタログ、スキーマ、ボリュームに置き換えます。
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
スカラ (プログラミング言語)
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
クリーンアップするには、次のコードを実行してサンプル テーブルを削除します。
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
スカラ (プログラミング言語)
spark.sql("DROP TABLE " + table_name)
メタデータ ファイルをクリーンアップする
VACUUMを実行して、Databricks Runtime 15.2 以降でCOPY INTOによって作成された参照されていないメタデータ ファイルをクリーンアップできます。
その他のリソース
- Databricks Runtime 7.x 以降:
COPY INTO