コントロール テーブルを使用して
多くのソースから取り込む必要がある場合があります。 そのリストが変更されると、ジョブ構成でハードコーディングすることは、コードを変更して再デプロイすることを意味します。 メタデータを使用して、実行時に読み取られ、使用されるテーブルにソースの一覧を格納することで、これに対処します。 ソースを新しい行として追加すると、次のジョブ実行でジョブ自体に変更を加えずに選択されます。
このチュートリアルでは、この方法を使用してジョブを構築する方法について説明します。 SQL タスクは制御テーブルを読み取り、 For each タスクはすべての行を並列で反復処理します。
どのように機能するのか
このパターンでは、次の 3 種類のタスクを順番に結合して使用します。
| Task | タイプ | 動作内容 |
|---|---|---|
read_markets |
SQL | 構成テーブルのクエリを実行し、結果を行配列としてキャプチャします |
process_markets |
各 |
{{tasks.read_markets.output.rows}}を反復処理し、入れ子になったタスクを行ごとに 1 回実行します。 |
run_market_analysis_iteration |
ノートブックまたは For each に入れ子になった SQL | パラメーターとして渡された行値を使用してビジネス ロジックを実行し、行ごとに 1 回実行します |
SQL タスクの出力 (行オブジェクトの JSON 配列) は、動的値参照For eachを使用して、 タスクの {{tasks.read_markets.output.rows}} フィールドに直接流れます。 その後、 For each タスクは、入れ子になったタスクにパラメーターとして各行を渡し、 {{input.market}} および {{input.currency}}として使用できます。
前提条件
- ジョブとノートブックを作成するアクセス許可を持つ Databricks ワークスペース
- Unity カタログでテーブルを作成するためのアクセス許可
- 構成テーブルを作成できる Unity カタログ スキーマ (たとえば、
config) - SQL タスクを実行する SQL ウェアハウス
手順 1: 構成テーブルを作成する
構成テーブルはコントロール プレーンです。 あなたのジョブが処理する値のリストを保持します。 作業を追加または削除する必要がある場合は、ジョブではなく、このテーブルを更新します。
次の SQL を実行して、markets スキーマにconfig テーブルを作成します。
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Databricks ノートブック、SQL エディター、または任意の SQL タスクを使用して、このステートメントを実行できます。 この手順の後、 config.markets には、市場ごとに 1 つずつ、通貨コードを含む 3 つの行が含まれます。
手順 2: 処理コードを記述する
For each タスク内の入れ子になったタスクは、行ごとに 1 回実行されます。 ビジネス ロジックに応じて、ノートブック タスクまたは SQL タスクを選択します。
ノートブック タスク
/Workspace/Users/<username>/process_marketなどのパスに新しいノートブックを作成します。 このノートブックは、 For each タスクのイテレーションごとに 1 回実行され、毎回異なる市場価値を受け取ります。
ノートブックに次のコードを追加します。
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
dbutils.widgets.text()呼び出しでは既定値が設定されるため、ノートブックをジョブに接続せずにワークスペースで直接実行できます。 ノートブックが For each タスク内で入れ子になったタスクとして実行されると、ジョブは既定値をそのイテレーションの実際のパラメーター値でオーバーライドします。
dbutils.widgets.text()する前にdbutils.widgets.get()を呼び出します。
getがtextより前に呼び出されると、ジョブの外部で実行したときに、ノートブックによってInputWidgetNotDefined エラーが発生します。
既定値を使用すると、ジョブの外部でノートブックをテストできますが、トレードオフに注意してください。 For each タスクが正しく構成されておらず、パラメーターを渡さない場合、ノートブックは既定値を使用し、失敗するのではなくサイレントに成功します。これにより、誤った構成の検出が困難になる可能性があります。
SQL タスク
SQL タスクでは、 :param_name 構文を使用した名前付きパラメーターがサポートされます。 反復値を使用する場合は、クエリ内の :market と :currency を参照します。
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
このクエリは、手順 5 のタスク エディターで直接構成します。
For each タスクは、現在のイテレーションの値を実行時に:marketおよび:currencyという名前付きパラメーターに渡します。 ノートブック タスクとは異なり、SQL 名前付きパラメーターは既定値をサポートしていません。パラメーターが渡されない場合、クエリはパラメーター解決エラーで失敗します。 クエリを実行する前にパラメーターを検証または既定にするには、代わりにノートブック タスクを使用します。
手順 3: ジョブを作成する
Databricks ワークスペースで、サイドバーの [ワークフロー ] をクリックし、[ ジョブの作成] をクリックします。 ジョブにわかりやすい名前を付けます (例: Market Analysis)。
手順 4: SQL 参照タスクを構成する
SQL タスクは、構成クエリを実行し、その出力をダウンストリーム タスクで使用できるようにします。
ジョブ エディターで、[タスクの 追加] をクリックします。
タスク名を
read_marketsに設定します。型 を SQL に設定します。
[SQL] フィールドに、次のクエリを入力します。
SELECT market, currency FROM config.marketsSQL ウェアハウスをワークスペース内のウェアハウスに設定します。
[ タスクの作成] をクリックします。
このタスクを実行すると、Databricks はクエリを実行し、結果を tasks.read_markets.output.rows の JSON 配列としてキャプチャします。 SQL タスクの出力は常に JSON 配列として返されます。追加の構成は必要ありません。 この参照の一般的な形式は tasks.<task-name>.output.rowsであり、 <task-name> はジョブ エディターで設定したタスク キーと一致します。 出力は次のようになります。
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
手順 5: For each タスクを構成する
For each タスクは SQL 出力を読み取り、行ごとに 1 つの入れ子になったタスク実行を起動します。
[ タスクの追加] をクリックし、[ 依存 ] を [
read_marketsに設定します。タスク名を
process_marketsに設定します。[種類] を [For each] に設定します。
[ 入力 ] フィールドに、次のように入力します。
{{tasks.read_markets.output.rows}}これにより、SQL タスクによってキャプチャされた行配列が参照されます。
コンカレンシーを
2に設定して、2 つのイテレーションを並列で実行できるようにします。 スループットを向上させるか、入れ子になったタスクで並列処理がサポートされている場合は、この値を大きくします。タスクを追加をクリックし、ステップ2で選択した種類に基づいてネストされたタスクを構成します。
ノートブック タスク
タスク名を
run_market_analysis_iterationに設定します。種類 を ノートブック に設定します。
手順 2 で作成したノートブックのパスに パス を設定します。
[ パラメーター] をクリックし、[ 追加 ] をクリックして、次の各パラメーターを追加します。
-
キー:
market、 値:{{input.market}} -
キー:
currency、 値:{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトから対応するフィールドに解決されます。-
キー:
[ タスクの作成] をクリックします。
SQL タスク
タスク名を
run_market_analysis_iterationに設定します。Type を SQL に設定します。
[SQL] フィールドに、名前付きパラメーターを含むクエリを入力します。次に例を示します。
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currencySQL ウェアハウスをワークスペース内のウェアハウスに設定します。
[ パラメーター] をクリックし、[ 追加 ] をクリックして、次の各パラメーターを追加します。
-
キー:
market、 値:{{input.market}} -
キー:
currency、 値:{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトから対応するフィールドに解決されます。-
キー:
[ タスクの作成] をクリックします。
現在、あなたのジョブ DAG では、read_markets が process_markets に流れ込み、入れ子になったタスクが For each ノード内に表示されています。
手順 6: ジョブを実行して確認する
- [ 今すぐ実行 ] をクリックしてジョブをトリガーします。
- ジョブの実行ページで、
process_marketsノードをクリックして、For eachタスクを展開します。 - ジョブ実行ページには、イテレーションのテーブル (市場価値ごとに 1 行) が表示され、それぞれに状態、開始時刻、期間が表示されます。
- イテレーション行をクリックしてタスクの実行出力を開き、正しい市場値を受け取ったかどうか確認します。
特定のイテレーションが失敗した場合は、ジョブ全体を再実行せずに、ジョブ実行ページからそのイテレーションのみを再実行できます。
パターンを拡張する
新しい市場を追加するには、構成テーブルに行を挿入します。
INSERT INTO config.markets VALUES ('DE', 'EUR');
次のジョブの実行にはドイツが自動的に含まれます。ジョブ構成の変更やノートブックの編集は必要ありません。
この同じパターンは、データの反復を促進するユース ケースに対して機能します。
- 顧客ごとの処理: 顧客 ID ごとに 1 行。ノートブックは、顧客固有の変換を適用するか、顧客固有の変換先に配信します。
- テーブル インジェスト: ソース テーブル名ごとに 1 行。ノートブックは各テーブルを読み取り、取り込みます。
- バックフィル処理: 日付パーティションごとに 1 行。ノートブックは、そのパーティションの履歴データを再処理します。
- 機能フラグドリブン実行: 有効な機能または実験ごとに 1 行。ノートブックによって、対応するロジックがアクティブ化されます。
処理から項目を削除するには、その行を削除するか、SQL クエリで active フラグ列とフィルターを追加します。
SELECT market, currency FROM config.markets WHERE active = TRUE
次のステップ
-
For eachタスクを使用して別のタスクをループで実行する - パラメーターの種類やコンカレンシー オプションなど、For eachタスクを構成するための完全なリファレンス -
For eachタスクで大きなパラメーター配列にルックアップ テーブルを使用する - 48 KB のタスク値の制限を超える大きなパラメーター配列を処理する方法 - タスクからのパラメーター値のアクセス — ノートブック、Python スクリプト、および SQL タスクのパラメーター値にアクセスするためのすべてのメソッド