次の方法で共有


コントロール テーブルを使用して For each ジョブを駆動する

多くのソースから取り込む必要がある場合があります。 そのリストが変更されると、ジョブ構成でハードコーディングすることは、コードを変更して再デプロイすることを意味します。 メタデータを使用して、実行時に読み取られ、使用されるテーブルにソースの一覧を格納することで、これに対処します。 ソースを新しい行として追加すると、次のジョブ実行でジョブ自体に変更を加えずに選択されます。

このチュートリアルでは、この方法を使用してジョブを構築する方法について説明します。 SQL タスクは制御テーブルを読み取り、 For each タスクはすべての行を並列で反復処理します。

どのように機能するのか

このパターンでは、次の 3 種類のタスクを順番に結合して使用します。

Task タイプ 動作内容
read_markets SQL 構成テーブルのクエリを実行し、結果を行配列としてキャプチャします
process_markets {{tasks.read_markets.output.rows}}を反復処理し、入れ子になったタスクを行ごとに 1 回実行します。
run_market_analysis_iteration ノートブックまたは For each に入れ子になった SQL パラメーターとして渡された行値を使用してビジネス ロジックを実行し、行ごとに 1 回実行します

SQL タスクの出力 (行オブジェクトの JSON 配列) は、動的値参照For eachを使用して、 タスクの {{tasks.read_markets.output.rows}} フィールドに直接流れます。 その後、 For each タスクは、入れ子になったタスクにパラメーターとして各行を渡し、 {{input.market}} および {{input.currency}}として使用できます。

前提条件

  • ジョブとノートブックを作成するアクセス許可を持つ Databricks ワークスペース
  • Unity カタログでテーブルを作成するためのアクセス許可
  • 構成テーブルを作成できる Unity カタログ スキーマ (たとえば、 config)
  • SQL タスクを実行する SQL ウェアハウス

手順 1: 構成テーブルを作成する

構成テーブルはコントロール プレーンです。 あなたのジョブが処理する値のリストを保持します。 作業を追加または削除する必要がある場合は、ジョブではなく、このテーブルを更新します。

次の SQL を実行して、markets スキーマにconfig テーブルを作成します。

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

Databricks ノートブック、SQL エディター、または任意の SQL タスクを使用して、このステートメントを実行できます。 この手順の後、 config.markets には、市場ごとに 1 つずつ、通貨コードを含む 3 つの行が含まれます。

手順 2: 処理コードを記述する

For each タスク内の入れ子になったタスクは、行ごとに 1 回実行されます。 ビジネス ロジックに応じて、ノートブック タスクまたは SQL タスクを選択します。

ノートブック タスク

/Workspace/Users/<username>/process_marketなどのパスに新しいノートブックを作成します。 このノートブックは、 For each タスクのイテレーションごとに 1 回実行され、毎回異なる市場価値を受け取ります。

ノートブックに次のコードを追加します。

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

dbutils.widgets.text()呼び出しでは既定値が設定されるため、ノートブックをジョブに接続せずにワークスペースで直接実行できます。 ノートブックが For each タスク内で入れ子になったタスクとして実行されると、ジョブは既定値をそのイテレーションの実際のパラメーター値でオーバーライドします。

dbutils.widgets.text()する前にdbutils.widgets.get()を呼び出します。 gettextより前に呼び出されると、ジョブの外部で実行したときに、ノートブックによってInputWidgetNotDefined エラーが発生します。

既定値を使用すると、ジョブの外部でノートブックをテストできますが、トレードオフに注意してください。 For each タスクが正しく構成されておらず、パラメーターを渡さない場合、ノートブックは既定値を使用し、失敗するのではなくサイレントに成功します。これにより、誤った構成の検出が困難になる可能性があります。

SQL タスク

SQL タスクでは、 :param_name 構文を使用した名前付きパラメーターがサポートされます。 反復値を使用する場合は、クエリ内の :market:currency を参照します。

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

このクエリは、手順 5 のタスク エディターで直接構成します。 For each タスクは、現在のイテレーションの値を実行時に:marketおよび:currencyという名前付きパラメーターに渡します。 ノートブック タスクとは異なり、SQL 名前付きパラメーターは既定値をサポートしていません。パラメーターが渡されない場合、クエリはパラメーター解決エラーで失敗します。 クエリを実行する前にパラメーターを検証または既定にするには、代わりにノートブック タスクを使用します。

手順 3: ジョブを作成する

Databricks ワークスペースで、サイドバーの [ワークフロー ] をクリックし、[ ジョブの作成] をクリックします。 ジョブにわかりやすい名前を付けます (例: Market Analysis)。

手順 4: SQL 参照タスクを構成する

SQL タスクは、構成クエリを実行し、その出力をダウンストリーム タスクで使用できるようにします。

  1. ジョブ エディターで、[タスクの 追加] をクリックします。

  2. タスク名read_marketsに設定します。

  3. SQL に設定します。

  4. [SQL] フィールドに、次のクエリを入力します。

    SELECT market, currency FROM config.markets
    
  5. SQL ウェアハウスをワークスペース内のウェアハウスに設定します。

  6. [ タスクの作成] をクリックします。

このタスクを実行すると、Databricks はクエリを実行し、結果を tasks.read_markets.output.rows の JSON 配列としてキャプチャします。 SQL タスクの出力は常に JSON 配列として返されます。追加の構成は必要ありません。 この参照の一般的な形式は tasks.<task-name>.output.rowsであり、 <task-name> はジョブ エディターで設定したタスク キーと一致します。 出力は次のようになります。

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

手順 5: For each タスクを構成する

For each タスクは SQL 出力を読み取り、行ごとに 1 つの入れ子になったタスク実行を起動します。

  1. [ タスクの追加] をクリックし、[ 依存 ] を [ read_marketsに設定します。

  2. タスク名process_marketsに設定します。

  3. [種類][For each] に設定します。

  4. [ 入力 ] フィールドに、次のように入力します。

    {{tasks.read_markets.output.rows}}
    

    これにより、SQL タスクによってキャプチャされた行配列が参照されます。

  5. コンカレンシー2 に設定して、2 つのイテレーションを並列で実行できるようにします。 スループットを向上させるか、入れ子になったタスクで並列処理がサポートされている場合は、この値を大きくします。

  6. タスクを追加をクリックし、ステップ2で選択した種類に基づいてネストされたタスクを構成します。

ノートブック タスク

  1. タスク名run_market_analysis_iterationに設定します。

  2. 種類ノートブック に設定します。

  3. 手順 2 で作成したノートブックのパスに パス を設定します。

  4. [ パラメーター] をクリックし、[ 追加 ] をクリックして、次の各パラメーターを追加します。

    • キー: market: {{input.market}}
    • キー: currency: {{input.currency}}

    {{input.<key>}} 参照は、現在のイテレーションの行オブジェクトから対応するフィールドに解決されます。

  5. [ タスクの作成] をクリックします。

SQL タスク

  1. タスク名run_market_analysis_iterationに設定します。

  2. TypeSQL に設定します。

  3. [SQL] フィールドに、名前付きパラメーターを含むクエリを入力します。次に例を示します。

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. SQL ウェアハウスをワークスペース内のウェアハウスに設定します。

  5. [ パラメーター] をクリックし、[ 追加 ] をクリックして、次の各パラメーターを追加します。

    • キー: market: {{input.market}}
    • キー: currency: {{input.currency}}

    {{input.<key>}} 参照は、現在のイテレーションの行オブジェクトから対応するフィールドに解決されます。

  6. [ タスクの作成] をクリックします。

現在、あなたのジョブ DAG では、read_marketsprocess_markets に流れ込み、入れ子になったタスクが For each ノード内に表示されています。

手順 6: ジョブを実行して確認する

  1. [ 今すぐ実行 ] をクリックしてジョブをトリガーします。
  2. ジョブの実行ページで、 process_markets ノードをクリックして、 For each タスクを展開します。
  3. ジョブ実行ページには、イテレーションのテーブル (市場価値ごとに 1 行) が表示され、それぞれに状態、開始時刻、期間が表示されます。
  4. イテレーション行をクリックしてタスクの実行出力を開き、正しい市場値を受け取ったかどうか確認します。

特定のイテレーションが失敗した場合は、ジョブ全体を再実行せずに、ジョブ実行ページからそのイテレーションのみを再実行できます。

パターンを拡張する

新しい市場を追加するには、構成テーブルに行を挿入します。

INSERT INTO config.markets VALUES ('DE', 'EUR');

次のジョブの実行にはドイツが自動的に含まれます。ジョブ構成の変更やノートブックの編集は必要ありません。

この同じパターンは、データの反復を促進するユース ケースに対して機能します。

  • 顧客ごとの処理: 顧客 ID ごとに 1 行。ノートブックは、顧客固有の変換を適用するか、顧客固有の変換先に配信します。
  • テーブル インジェスト: ソース テーブル名ごとに 1 行。ノートブックは各テーブルを読み取り、取り込みます。
  • バックフィル処理: 日付パーティションごとに 1 行。ノートブックは、そのパーティションの履歴データを再処理します。
  • 機能フラグドリブン実行: 有効な機能または実験ごとに 1 行。ノートブックによって、対応するロジックがアクティブ化されます。

処理から項目を削除するには、その行を削除するか、SQL クエリで active フラグ列とフィルターを追加します。

SELECT market, currency FROM config.markets WHERE active = TRUE

次のステップ