ストリーミング テーブルは、ストリーミングまたは増分データ処理をサポートするテーブルです。 ストリーミング テーブルはパイプラインによってサポートされます。 ストリーミング テーブルが更新されるたびに、ソース テーブルに追加されたデータがストリーミング テーブルに追加されます。 ストリーミング テーブルは、手動で、またはスケジュールに従って更新できます。
更新を実行またはスケジュールする方法の詳細については、「 パイプライン更新の実行」を参照してください。
構文
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ {flow_clause | AS query} ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
} [ ... ]
flow_clause
FLOW { { INSERT [ONCE] BY NAME query } |
{ AUTO CDC auto_cdc_flow_spec } }
パラメーター
REFRESH
指定されている場合、テーブルを作成するか、既存のテーブルとその内容を更新します。
プライベート
プライベート ストリーミング テーブルを作成します。
- これらはカタログに追加されず、定義したパイプライン内でのみアクセスできます。
- カタログ内の既存のオブジェクトと同じ名前を持つことができます。 パイプライン内で、プライベート ストリーミング テーブルとカタログ内のオブジェクトの名前が同じである場合、この名前への参照はプライベート ストリーミング テーブルに解決されます。
- プライベート ストリーミング テーブルは、1 回の更新中ではなく、パイプラインの有効期間全体でのみ保持されます。
プライベート ストリーミング テーブルは、以前は
TEMPORARYパラメーターを使用して作成されました。table_name
新しく作成されたテーブルの名前。 完全修飾のテーブル名は一意にする必要があります。
テーブル仕様
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
-
列名は一意である必要があり、かつクエリの出力列にマップされている必要があります。
-
列のデータ型を指定します。 Azure Databricks でサポートされているすべてのデータ型が、ストリーミング テーブルでサポートされているわけではありません。
column_comment
列を記述する任意の
STRINGリテラル。 このオプションは、column_typeと共に指定する必要があります。 列の種類が指定されていない場合、列コメントはスキップされます。-
テーブルに流入するデータを検証する制約を追加します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
-
列マスク関数を追加して、機密データを匿名化します。
「行フィルターと列マスク」を参照してください。
-
テーブル制約
スキーマを指定するときに、主キーと外部キーを定義できます。 制約は情報提供のみを目的としており、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
注
テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインである必要があります。
テーブル条項
必要に応じて、テーブルのパーティション分割、コメント、ユーザー定義プロパティを指定します。 各サブ句は、1 回だけ指定できます。
DELTA の使用
データ形式を指定します。 オプションは DELTA のみです。
この句は省略可能で、既定値は DELTA です。
でパーティション分割
テーブル内のパーティション分割に使用する 1 つ以上の列のリスト (省略可能)。
CLUSTER BYと相互に排他的です。液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 パイプラインに
CLUSTER BYするのではなく、PARTITIONED BYを使用することを検討してください。CLUSTER BY
リキッド クラスタリングをテーブルに対して有効化し、クラスタリング キーとして使用する列を定義します。
CLUSTER BY AUTOで自動液体クラスタリングを使用し、Databricks はクエリのパフォーマンスを最適化するためにクラスタリング キーをインテリジェントに選択します。PARTITIONED BYと相互に排他的です。表に液体クラスタリングを使用するを参照してください。
場所
テーブル データの保存場所 (省略可能)。 設定されていない場合のシステムの既定値はパイプラインの保存場所となります。
コメント
テーブルについて説明するオプションの
STRINGリテラル。TBLPROPERTIES
テーブルのテーブル プロパティのリスト (省略可能)。
で ROW FILTER
行フィルター関数をテーブルに追加します。 それ以降のそのテーブルに対するクエリでは、行のうち、この関数による評価の結果が TRUE であるものだけが返されます。 これは、細粒度のアクセス制御に役立ちます。呼び出し元ユーザーの ID とグループ メンバーシップをその関数で検査した結果として、特定の行をフィルター処理するかどうかを決定できるからです。
ROW FILTERの条項を参照してください。フロー
必要に応じて、テーブルを作成する フロー をインラインで定義します。 フローは、テーブルの内容を更新するステートフル クエリです。
FLOWを指定しない場合は、代わりにAS queryを使用するか、CREATE FLOWを使用してフローを個別に定義できます。 次のいずれかのフローの種類を指定できます。INSERT 名前別
列名でテーブルにデータを挿入します。
ONCEオプションを指定しない場合、クエリはストリーミング クエリである必要があります。 ストリーミング セマンティクスを使用してソースから読み取る場合は、STREAMキーワードを使用します。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。注
FLOW INSERT BY NAMEは、AS queryの使用と同じです。 次の 2 つのステートメントの動作は同じです。CREATE OR REFRESH STREAMING TABLE raw_data AS SELECT * FROM STREAM read_files('abfss://my_path'); CREATE OR REFRESH STREAMING TABLE raw_data FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');ある時
必要に応じて、バックフィルなどの 1 回限りのフローとしてフローを定義します。
ONCEが指定されると、クエリはストリーミング クエリではなく、フローは既定で 1 回実行されます。 テーブルが完全更新で更新されると、ONCEフローが再度実行され、データが再作成されます。ONCEは、INSERT BY NAMEフローにのみ適用されます。AUTO CDCImportant
Databricks Runtime 17.3 以降および
PREVIEWPipelines チャネルで使用できます。ソースからテーブルへの変更データ キャプチャ (CDC) レコードを処理する
AUTO CDCフローを定義します。 ソース データに CDC セマンティクスが含まれている場合は、AUTO CDCを使用します。 「AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
-
この句により、
queryからデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、SkipChangeCommits読み取りオプションを追加してエラーを処理できます。queryとtable_specificationを一緒に指定するとき、table_specificationに指定されているテーブル スキーマに、queryから返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specificationで指定されているが、queryから返されない列はクエリ時にnull値を返します。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
読み取りオプション
クエリで読み取りオプションを指定して、ソースからデータを読み取る方法を構成できます。 たとえば、ソース データ内の変更コミットをスキップする
skipChangeCommitsを指定できます。 読み取りオプションは、クエリのWITH句でマップとして指定されます。 例えば次が挙げられます。SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS=TRUE, STARTINGVERSION=X)=TRUEは省略可能であるため、次のようなブール値オプションを指定することもできます。SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS)注
読み取りオプションは、Databricks Runtime 17.3 以降でのみサポートされています。
以下の読み取りオプションは Delta でサポートされています。各オプションの詳細については、 Delta テーブルストリーミングの読み取りと書き込みを参照してください。
maxFilesPerTriggermaxBytesPerTriggerstartingVersionstartingTimestampreadChangeFeedwithEventTimeOrderskipChangeCommits
必要なアクセス許可
パイプラインの実行時のユーザーには、次のアクセス許可が必要です。
- ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT権限。 - 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルのスキーマに対する
CREATE MATERIALIZED VIEW権限。
ストリーミング テーブルが定義されているパイプラインをユーザーが更新できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルの所有権またはストリーミング テーブルに対する
REFRESH権限。 - ストリーミング テーブルの所有者は、ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT権限を持っている必要があります。
ユーザーが結果のストリーミング テーブルに対してクエリを実行できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルに対する
SELECT権限。
制限事項
- テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
-
ALTER TABLEコマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESHまたは ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。 -
INSERT INTOやMERGEなどの DML コマンドを利用してテーブル スキーマを導き出すことはできません。 - 次のコマンドは、ストリーミング テーブルではサポートされていません。
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- テーブルの名前変更や所有者の変更はサポートされていません。
- 生成された列、ID 列、既定の列はサポートされていません。
例示
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;
-- Define a streaming table with an inline append flow:
CREATE OR REFRESH STREAMING TABLE raw_data
FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
-- Define a streaming table with an inline AUTO CDC flow:
CREATE OR REFRESH STREAMING TABLE target
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;