次の方法で共有


チュートリアル: Spark SQL を使用した COPY INTO

Databricks では、数千のファイルを含むデータ ソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。

このチュートリアルでは、COPY INTO コマンドを使用して、Unity カタログ ボリュームから Azure Databricks ワークスペースの Delta テーブルに JSON データを読み込みます。 Wanderbricks サンプル データセットをデータ ソースとして使用します。 より高度なインジェストのユース ケースについては、「 自動ローダーとは」を参照してください。

必要条件

手順 1: 環境を構成する

このチュートリアルのコードでは、Unity カタログ ボリュームを使用して JSON ソース ファイルを格納します。 <catalog>を、CREATE SCHEMACREATE VOLUMEのアクセス許可を持つカタログに置き換えます。 コードを実行できない場合は、ワークスペース管理者に問い合わせてください。

ノートブックを作成 し、コンピューティング リソースにアタッチします。 次のコードを実行して、このチュートリアルのスキーマとボリュームを設定します。

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

手順 2: サンプル データを JSON としてボリュームに書き込む

COPY INTO コマンドは、ファイル ベースのソースからデータを読み込みます。 Wanderbricksbookings サンプル テーブルから読み取り、JSON ファイルとしてレコードのバッチをボリュームに書き込み、外部システムから到着するデータをシミュレートします。

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

ボリュームにファイルを書き込むには、Pythonが必要です。 実際のワークフローでは、このデータは外部システムから到着します。

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

手順 3: COPY INTO を使用して JSON データをべき等に読み込む

COPY INTOを使用する前に、ターゲット Delta テーブルを作成します。 CREATE TABLE ステートメントでテーブル名以外の情報を指定する必要はありません。 このアクションはべき等であるため、コードを複数回実行した場合でも、Databricks はデータを 1 回だけ読み込みます。

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

手順 4: テーブルの内容をプレビューする

Wanderbricks 予約データの最初のバッチからテーブルに 20 行が含まれていること、および JSON ソース ファイルからスキーマが正しく推論されたことを確認します。

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

手順 5: より多くのデータを読み込み、結果をプレビューする

レコードの別のバッチを書き込み、 COPY INTO をもう一度実行することで、外部システムから到着する追加のデータをシミュレートできます。 次のコードを実行して、データの 2 番目のバッチを書き込みます。

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

ボリュームにファイルを書き込むには、Pythonが必要です。 実際のワークフローでは、このデータは外部システムから到着します。

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

次に、手順 3 の COPY INTO コマンドをもう一度実行し、テーブルをプレビューして新しいレコードを確認します。 新しいファイルのみが読み込まれます。

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

手順 6: チュートリアルをクリーンアップする

このチュートリアルが完了したら、関連付けられているリソースを保持する必要がなくなったら、クリーンアップできます。 スキーマ、テーブル、ボリュームを削除し、すべてのデータを削除します。

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

その他のリソース