Databricks では、数千のファイルを含むデータ ソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。
このチュートリアルでは、COPY INTO コマンドを使用して、Unity カタログ ボリュームから Azure Databricks ワークスペースの Delta テーブルに JSON データを読み込みます。
Wanderbricks サンプル データセットをデータ ソースとして使用します。 より高度なインジェストのユース ケースについては、「 自動ローダーとは」を参照してください。
必要条件
- コンピューティング リソースへのアクセス。 「コンピューティング」を参照してください。
- カタログ内にスキーマとボリュームを作成するアクセス許可を持つ Unity カタログ対応ワークスペース。 Unity カタログを使用したクラウド オブジェクト ストレージへの接続を参照してください。
手順 1: 環境を構成する
このチュートリアルのコードでは、Unity カタログ ボリュームを使用して JSON ソース ファイルを格納します。
<catalog>を、CREATE SCHEMAとCREATE VOLUMEのアクセス許可を持つカタログに置き換えます。 コードを実行できない場合は、ワークスペース管理者に問い合わせてください。
ノートブックを作成 し、コンピューティング リソースにアタッチします。 次のコードを実行して、このチュートリアルのスキーマとボリュームを設定します。
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
手順 2: サンプル データを JSON としてボリュームに書き込む
COPY INTO コマンドは、ファイル ベースのソースからデータを読み込みます。
Wanderbricksbookings サンプル テーブルから読み取り、JSON ファイルとしてレコードのバッチをボリュームに書き込み、外部システムから到着するデータをシミュレートします。
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
ボリュームにファイルを書き込むには、Pythonが必要です。 実際のワークフローでは、このデータは外部システムから到着します。
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
手順 3: COPY INTO を使用して JSON データをべき等に読み込む
COPY INTOを使用する前に、ターゲット Delta テーブルを作成します。
CREATE TABLE ステートメントでテーブル名以外の情報を指定する必要はありません。 このアクションはべき等であるため、コードを複数回実行した場合でも、Databricks はデータを 1 回だけ読み込みます。
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
手順 4: テーブルの内容をプレビューする
Wanderbricks 予約データの最初のバッチからテーブルに 20 行が含まれていること、および JSON ソース ファイルからスキーマが正しく推論されたことを確認します。
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
手順 5: より多くのデータを読み込み、結果をプレビューする
レコードの別のバッチを書き込み、 COPY INTO をもう一度実行することで、外部システムから到着する追加のデータをシミュレートできます。 次のコードを実行して、データの 2 番目のバッチを書き込みます。
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
ボリュームにファイルを書き込むには、Pythonが必要です。 実際のワークフローでは、このデータは外部システムから到着します。
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
次に、手順 3 の COPY INTO コマンドをもう一度実行し、テーブルをプレビューして新しいレコードを確認します。 新しいファイルのみが読み込まれます。
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
手順 6: チュートリアルをクリーンアップする
このチュートリアルが完了したら、関連付けられているリソースを保持する必要がなくなったら、クリーンアップできます。 スキーマ、テーブル、ボリュームを削除し、すべてのデータを削除します。
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;