適用対象:
Databricks SQL
Databricks Runtime 13.3 LTS 以降
指定された場所にあるファイルを読み取り、表形式でデータを返します。
JSON、CSV、XML、TEXT、BINARYFILE、PARQUET、AVRO、および ORC ファイル形式の読み取りをサポートします。
ファイル形式を自動的に検出し、すべてのファイルで統合スキーマを推論できます。
構文
read_files(path [, option_key => option_value ] [...])
議論
この関数には、オプション キーの名前付きパラメーター呼び出しが必要です。
-
path: データの場所の URI を持つSTRING。 Azure Data Lake Storage ('abfss://')、S3 (s3://) および Google Cloud Storage ('gs://') からの読み取りをサポートします。 glob を含めることができます。 詳細については、「ファイルの検出」を参照してください。 -
option_key: 構成するオプションの名前。 バッククォート () for options that contain dots (.`) を使う必要があります。 -
option_value: オプションを設定する 定数式 。 リテラルとスカラー関数を受け入れます。
戻り値
指定した pathの下で読み取られたファイルからのデータを含むテーブル。 スキーマはファイル形式によって異なります。
BINARYFILE: 固定スキーマを返します。コラム タイプ 説明 pathSTRINGファイルのフルパスです。 modificationTimeTIMESTAMPファイルの最終変更時刻。 lengthLONGファイルのサイズをバイト単位で指定します。 contentBINARYファイルのバイナリ コンテンツ。 * EXCEPT (content)を使用して、ファイル メタデータのクエリを実行するときにバイナリ コンテンツを除外します。TEXT: 1 つのvalue(STRING) 列を持つ固定スキーマを返します。その他のすべての形式 (JSON、CSV、XML、PARQUET、AVRO、ORC): スキーマはファイルの内容から推論されるか、
schemaオプションを使用して明示的に指定されます。
_metadata コラム
read_files は、ファイル レベルのメタデータを含む _metadata 列を公開します。 この列は SELECT * 結果には含まれていないので、明示的に選択する必要があります。 これは次のフィールドが含まれています。
| フィールド | タイプ | 説明 |
|---|---|---|
file_path |
STRING |
ソース ファイルへの完全パス。 |
file_name |
STRING |
ソース ファイルの名前。 |
file_size |
LONG |
ソース ファイルのサイズ (バイト単位)。 |
file_modification_time |
TIMESTAMP |
ソース ファイルの最終変更時刻。 |
file_block_start |
LONG |
読み取られるファイルのブロックの先頭。 |
file_block_length |
LONG |
読み取られるファイルのブロックの長さ。 |
結果に _metadata を含めるには、明示的に選択します。
SELECT * EXCEPT (content), _metadata
FROM read_files('/Volumes/my_catalog/my_schema/my_volume', format => 'binaryFile');
ファイルの検出
read_files では、個々のファイルを読み取ったり、指定されたディレクトリの下にあるファイルを読み取ったりできます。
read_files は、glob が指定されていない限り、指定されたディレクトリ内のすべてのファイルを再帰的に検出します。これは、read_files を特定のディレクトリ パターンに再帰するように指示します。
glob パターンを使用したディレクトリまたはファイルのフィルター処理
glob パターンは、パスに指定されているときに、ディレクトリとファイルのフィルター処理に使用できます。
| パターン | 説明 |
|---|---|
? |
任意の 1 文字と一致します |
* |
0 個以上の文字と一致します |
[abc] |
文字セット {a, b, c} の 1 文字と一致します。 |
[a-z] |
文字範囲 {a…z} の 1 文字と一致します。 |
[^a] |
文字セットまたは範囲 {a} からのものではない 1 文字と一致します。
^ 文字は左角かっこのすぐ右側に表示されることに注意してください。 |
{ab,cd} |
文字列セット {ab, cd} の文字列と一致します。 |
{ab,c{de, fh}} |
文字列セット {ab, cde, cfh} の文字列と一致します。 |
read_files では、glob を使用してファイルを検出する際に、オートローダーの厳密なグロバーが使用されます。 これは、useStrictGlobber オプションで構成されます。 厳密な globber を無効にすると、末尾のスラッシュ (/) が削除され、/*/ などの star パターンが複数のディレクトリを検出するように拡張できます。 動作の違いについては、以下の例を参照してください。
| パターン | ファイルパス | 厳密な globber が無効 | 厳密な globber が有効 |
|---|---|---|---|
/a/b |
/a/b/c/file.txt |
はい | はい |
/a/b |
/a/b_dir/c/file.txt |
いいえ | いいえ |
/a/b |
/a/b.txt |
いいえ | いいえ |
/a/b/ |
/a/b.txt |
いいえ | いいえ |
/a/*/c/ |
/a/b/c/file.txt |
はい | はい |
/a/*/c/ |
/a/b/c/d/file.txt |
はい | はい |
/a/*/d/ |
/a/b/c/d/file.txt |
はい | いいえ |
/a/*/c/ |
/a/b/x/y/c/file.txt |
はい | いいえ |
/a/*/c |
/a/b/c_file.txt |
はい | いいえ |
/a/*/c/ |
/a/b/c_file.txt |
はい | いいえ |
/a/*/c |
/a/b/cookie/file.txt |
はい | いいえ |
/a/b* |
/a/b.txt |
はい | はい |
/a/b* |
/a/b/file.txt |
はい | はい |
/a/{0.txt,1.txt} |
/a/0.txt |
はい | はい |
/a/*/{0.txt,1.txt} |
/a/0.txt |
いいえ | いいえ |
/a/b/[cde-h]/i/ |
/a/b/c/i/file.txt |
はい | はい |
スキーマ推論
ファイルのスキーマは、read_files オプションを使用して明示的に schema に指定できます。 スキーマが指定されていない場合、検出されたファイル全体で read_files が統合スキーマを推論しようとします。ここでは、LIMIT ステートメントを使用しない限り、すべてのファイルを読み取る必要があります。
LIMIT クエリを使用する場合でも、データのより代表的なスキーマを返すために、必要以上に大きなファイル セットを読み取る場合があります。 Databricks は、ユーザーがクエリを指定していない場合、ノートブックと SQL エディターでLIMITクエリのSELECTステートメントを自動的に追加します。
schemaHints オプションは、推論されたスキーマのサブセットの修正に使用できます。 詳細については、「スキーマヒントを使用してスキーマ推論をオーバーライドする」を参照してください。
既定では、スキーマと一致しないデータを復旧するための rescuedDataColumn が提供されます。 詳細については、「復旧されたデータ列とは」を参照してください。 オプション rescuedDataColumn を設定して、schemaEvolutionMode => 'none' を削除できます。
パーティション スキーマの推論
read_files では、ファイルが Hive スタイルのパーティション分割されたディレクトリ () の下に格納されている場合、/column_name=column_value/を推論することもできます。
schema が指定されている場合、検出されたパーティション列では、schema で提供される型が使用されます。 パーティション列が指定された schema の一部でない場合、推論されるパーティション列は無視されます。
パーティション スキーマとデータ列の両方に列が存在する場合は、データ値の代わりにパーティション値から読み取られた値が使用されます。 ディレクトリから生成される値を無視してデータ列を使用する場合は、partitionColumns オプションを使用してコンマ区切りリスト内のパーティション列の一覧を指定できます。
partitionColumns オプションは、最終的な推論スキーマに含める検出列を指定するために read_files を使用することもできます。 空の文字列を指定すると、すべてのパーティション列が無視されます。
パーティション列の推論されたスキーマをオーバーライドする schemaHints オプションを指定することもできます。
TEXT および BINARYFILE 形式には固定スキーマがありますが、可能な場合は、read_files でこれらの形式のパーティション分割の推論も試行されます。
クラウド ストレージの認証
read_files は、Unity カタログの外部の場所または Unity カタログ ボリューム (マネージドと外部の両方) からファイルを読み取ります。 外部の場所に対する READ FILES 特権、または読み取るファイルを含むボリュームに対する READ VOLUME 特権が必要です。
「Unity カタログを使用してクラウド オブジェクト ストレージに接続する」または「Unity カタログ ボリュームとは」を参照してください。
ストリーミング テーブルでの使用状況
read_files は、Delta Lake にファイルを取り込むためのストリーミング テーブルで使用できます。
read_files は、ストリーミング テーブル クエリで使用される場合に自動ローダーを利用します。
STREAM で read_files キーワードを使用する必要があります。 詳細については、「自動ローダーとは」を参照してください。
ストリーミング クエリで使用する場合、read_files ではデータのサンプルを使用してスキーマを推論し、より多くのデータを処理するスキーマを進化させることができます。 詳細については、「自動ローダーでのスキーマの推論と展開の構成」を参照してください。
オプション
[基本] オプション
| オプション |
|---|
format次のコマンドを入力します: Stringソース パスのデータ ファイル形式。 指定されていない場合は自動推論されます。 使用できる値は、以下のとおりです。
既定値: なし |
schema次のコマンドを入力します: String読み取るファイルのスキーマ。 DDL 形式を使用してスキーマ文字列を指定します (例: 'id int, ts timestamp, event string')。 スキーマが指定されていない場合、 read_files は検出されたファイル全体 で統合スキーマの推論 を試みます。既定値: なし |
inferColumnTypes次のコマンドを入力します: Booleanスキーマの推論を利用するときに、正確な列型を推論するかどうか。 既定では、列は JSON および CSV データセットを推論するときに推論されます。 詳細については、スキーマの推論に関する説明を参照してください。 これは、自動ローダーの既定値とは逆であることに注意してください。 既定値: true |
partitionColumns次のコマンドを入力します: Stringファイルのディレクトリ構造から推論する Hive スタイル パーティション列のコンマ区切りリスト。 Hive スタイル パーティション列は、次のような等号で組み合わされたキーと値のペアになります <base-path>/a=x/b=1/c=y/file.format。 この例では、パーティション列は a、b、c です。 既定では、スキーマ推論を使用しており、<base-path> にデータの読み込み元を指定する場合、これらの列は自動的にスキーマに追加されます。 スキーマを指定すると、自動ローダーにより、これらの列がこのスキーマに含まれると想定されます。 これらの列をスキーマの一部に含めない場合は、"" を指定して、これらの列を無視することができます。 さらに、下の例のような複雑なディレクトリ構造のファイル パスから列を推論するときに、このオプションを使用できます。<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvcloudFiles.partitionColumns を year,month,day に指定すると、year=2022 に file1.csv が返されますが、month および day 列は null になります。month と day は、file2.csv および file3.csv に対して正しく解析されます。既定値: なし |
schemaHints次のコマンドを入力します: Stringスキーマの推論中に自動ローダーに提供するスキーマ情報。 詳細については、スキーマ ヒントに関するページを参照してください。 既定値: なし |
useStrictGlobber次のコマンドを入力します: BooleanApache Spark の他のファイル ソースの既定のグロビング動作に一致する厳密な globber を使用するかどうか。 詳細については、「一般的なデータ読み込みパターン」を参照してください。 Databricks Runtime 12.2 LTS 以降で使用できます。 これは、自動ローダーの既定値とは逆であることに注意してください。 既定値: true |
書式固有のオプション
各ファイル形式 (JSON、CSV、XML、Parquet、Avro、テキスト、ORC、バイナリ) に固有のオプションについては、「 DataFrameReader オプション」を参照してください。
ストリーミング オプション
これらのオプションは、read_filesまたはストリーミング クエリ内でを使用する場合に適用されます。
| オプション |
|---|
allowOverwrites次のコマンドを入力します: Boolean検出後に変更されたファイルを再処理するかどうか。 成功した前回の更新クエリの開始時刻以降にファイルが変更された場合、ファイルの利用可能な最新バージョンは更新中に処理されます。 既定値: false |
includeExistingFiles次のコマンドを入力します: Booleanストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着した新しいファイルのみを処理するか。 このオプションは、初めてストリームを開始するときにのみ評価されます。 ストリームの再起動後にこのオプションを変更した場合、効果はありません。 既定値: true |
maxBytesPerTrigger次のコマンドを入力します: Byte String各トリガーで処理される新しいバイトの最大数。 10g などのバイト文字列を指定して、各マイクロバッチを 10 GB のデータに制限できます。 これはソフト最大値です。 それぞれ 3 GB のファイルがある場合、Azure Databricksはマイクロバッチで 12 GB を処理します。
maxFilesPerTriggerと共に使用すると、Azure Databricksは、maxFilesPerTrigger または maxBytesPerTrigger の下限のいずれか早い方に消費されます。注: サーバーレス SQL ウェアハウスで作成されたストリーミング テーブルの場合、このオプションと maxFilesPerTrigger は、最良の待ち時間とパフォーマンスを与える目的でワークロード サイズとサーバーレス コンピューティング リソースによってスケーリングする動的承認制御を活用するように設定しないでください。既定値: なし |
maxFilesPerTrigger次のコマンドを入力します: Integer各トリガーで処理される新しいファイルの最大数。 maxBytesPerTriggerと共に使用すると、Azure Databricksは、maxFilesPerTrigger または maxBytesPerTrigger の下限のいずれか早い方に消費されます。注: サーバーレス SQL ウェアハウスで作成されたストリーミング テーブルの場合、このオプションと maxBytesPerTrigger は、最良の待ち時間とパフォーマンスを与える目的でワークロード サイズとサーバーレス コンピューティング リソースによってスケーリングする動的承認制御を活用するように設定しないでください。既定値: 1000 |
schemaEvolutionMode次のコマンドを入力します: String新しい列がデータで検出された場合にスキーマを展開するモード。 既定では、列は JSON データセットを推論するときに文字列として推論されます。 詳細については、スキーマの展開に関する説明を参照してください。 このオプションは、ファイルの text と binaryFile には適用されません。既定値: スキーマが指定されていない場合は "addNewColumns"。"none" それ以外の場合。 |
schemaLocation次のコマンドを入力します: String推論されたスキーマとそれ以降の変更を保存する場所。 詳細については、スキーマの推論に関する説明を参照してください。 ストリーミング テーブル クエリで使用する場合、スキーマの場所は必要ありません。 既定値: なし |
例
-- Reads the files available in the given path. Auto-detects the format and schema of the data.
> SELECT * FROM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Reads the headerless CSV files in the given path with the provided schema.
> SELECT * FROM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Infers the schema of CSV files with headers. Because the schema is not provided,
-- the CSV files are assumed to have headers.
> SELECT * FROM read_files(
's3://bucket/path',
format => 'csv')
-- Reads files that have a csv suffix.
> SELECT * FROM read_files('s3://bucket/path/*.csv')
-- Reads a single JSON file
> SELECT * FROM read_files(
'abfss://container@storageAccount.dfs.core.windows.net/path/single.json')
-- Reads JSON files and overrides the data type of the column `id` to integer.
> SELECT * FROM read_files(
's3://bucket/path',
format => 'json',
schemaHints => 'id int')
-- Reads files that have been uploaded or modified yesterday.
> SELECT * FROM read_files(
'gs://my-bucket/avroData',
modifiedAfter => date_sub(current_date(), 1),
modifiedBefore => current_date())
-- Creates a Delta table and stores the source file path as part of the data
> CREATE TABLE my_avro_data
AS SELECT *, _metadata.file_path
FROM read_files('gs://my-bucket/avroData')
-- Creates a streaming table that processes files that appear only after the table's creation.
-- The table will most likely be empty (if there's no clock skew) after being first created,
-- and future refreshes will bring new data in.
> CREATE OR REFRESH STREAMING TABLE avro_data
AS SELECT * FROM STREAM read_files('gs://my-bucket/avroData', includeExistingFiles => false);
非構造化ファイルの操作
次の例では、 BINARYFILE 形式を使用して、Unity カタログ ボリュームに格納されている非構造化ファイルの読み取りとフィルター処理を行い、 read_files と AI 関数を組み合わせてファイルの内容を処理します。
ボリューム内のすべてのファイルを一覧表示する: * EXCEPT (content) を使用してバイナリ コンテンツを読み込まずにファイル メタデータを返し、 _metadata を明示的に選択してファイル レベルのメタデータ フィールドを含めます。
SELECT
* EXCEPT (content),
_metadata
FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>',
format => 'binaryFile'
);
サイズでフィルター処理された画像ファイルを一覧表示する: fileNamePattern を使用して特定のイメージ ファイルの種類を対象とし、 _metadata.file_size でフィルター処理して、特定のサイズ範囲内のファイルのみを返します。
SELECT
* EXCEPT (content),
_metadata
FROM read_files(
'/Volumes/my_catalog/my_schema/my_volume',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png,JPG,JPEG,PNG}'
)
WHERE _metadata.file_size BETWEEN 20000 AND 1000000;
過去 1 日以内に変更された PDF ファイルを一覧表示する: fileNamePattern を使用して PDF ファイルを対象にし、 modificationTime でフィルター処理して、過去 1 日以内に変更されたファイルのみを返します。
SELECT
* EXCEPT (content),
_metadata
FROM read_files(
'/Volumes/my_catalog/my_schema/my_volume',
format => 'binaryFile',
fileNamePattern => '*.{pdf,PDF}'
)
WHERE modificationTime >= current_timestamp() - INTERVAL 1 DAY;
イメージ ファイルに対して AI 関数を実行する: ai_query を使用して、クラウド ストレージ パスから読み取られたイメージ ファイルを処理します。
_metadataフィールドをフィルター処理して、特定のファイルを対象とします。
SELECT
path AS file_path,
ai_query(
'databricks-llama-4-maverick',
'Describe this image in ten words or less: ',
files => content
) AS result
FROM read_files(
's3://my-s3-bucket/path/to/images/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,png,JPG,JPEG,PNG}'
)
WHERE _metadata.file_size < 1000000
AND _metadata.file_name LIKE '%robots%';
ファイル名パターンに一致するドキュメントを解析する: ai_parse_document を使用して、PDF と画像から構造化コンテンツを抽出します。 特定のファイルを対象とする _metadata.file_name でフィルター処理します。
SELECT
path AS file_path,
ai_parse_document(
content,
map('version', '2.0')
) AS result
FROM read_files(
'/Volumes/main/public/my_files/',
format => 'binaryFile',
fileNamePattern => '*.{jpg,jpeg,pdf,png}'
)
WHERE _metadata.file_name ILIKE '%receipt%';
構造化テーブルを使用してファイルを結合する: 非構造化ワークフローでは、多くの場合、テーブルに格納されている構造化データを非構造化ファイルとマージする必要があります。 次の例では、クラウド ストレージ パス内のファイルを 2 つの構造化テーブルと結合し、ファイル サイズとユーザー属性でフィルター処理します。
user_filesとの結合は、splitとelement_atを使用してファイル パスからファイル ID を抽出することによって行われます。
SELECT
users.user_id,
user_files.file_id,
files._metadata.file_name AS file_name,
files.* EXCEPT (content),
ai_parse_document(files.content, map('version', '2.0')) AS parsed_document
FROM read_files(
's3://my-bucket-name/files/',
format => 'binaryFile',
fileNamePattern => '*.{pdf,doc,docx,ppt,pptx,png,jpg,jpeg}'
) AS files
JOIN user_files
ON user_files.file_id = element_at(split(files.path, '/'), -2)
JOIN users
ON users.user_id = user_files.user_id
WHERE users.email LIKE '%@databricks.com'
AND files._metadata.file_size < 10000000;
関連記事
- CREATE STREAMING TABLE
- テーブル値関数
read_kafka