Utiliser une table de contrôles pour piloter un For each travail

Vous devrez peut-être ingérer à partir de nombreuses sources. Lorsque cette liste change, le codage en dur dans la configuration du travail signifie modifier le code et redéployer. Utilisez les métadonnées pour résoudre ce problème en stockant la liste des sources dans une table qui est lue et utilisée au moment de l’exécution. Ajoutez une source en tant que nouvelle entrée. Lors de l'exécution du travail suivant, cette source est récupérée sans aucune modification du travail lui-même.

Ce tutoriel vous montre comment créer un travail à l’aide de cette approche. Une tâche SQL lit la table de contrôle, et une tâche For each itère sur chaque ligne en parallèle.

Fonctionnement

Le modèle utilise trois types de tâches câblés ensemble dans la séquence :

Tâche Type Qu’est-ce que cela fait ?
read_markets SQL Interroge une table de configuration et capture le résultat sous la forme d’un tableau de lignes
process_markets Pour chaque Itère sur {{tasks.read_markets.output.rows}}, exécutant la tâche imbriquée une fois par rangée
run_market_analysis_iteration Notebook ou SQL (imbriqué dans un "For each") S’exécute une fois par ligne, à l’aide de valeurs de ligne passées en tant que paramètres pour exécuter votre logique métier

La sortie de la tâche SQL, constituée d'un tableau JSON d'objets de ligne, circule directement dans le champ For each de la tâche à l’aide de la référence de valeur dynamique {{tasks.read_markets.output.rows}}. La tâche For each transfère ensuite chaque ligne à la tâche imbriquée comme paramètres, disponibles comme {{input.market}} et {{input.currency}}.

Prerequisites

  • Un espace de travail Databricks avec l’autorisation de créer des jobs et des notebooks
  • Autorisation de créer des tables dans le catalogue Unity
  • Schéma de catalogue Unity dans lequel vous pouvez créer la table de configuration (par exemple, config)
  • Un entrepôt SQL pour exécuter les tâches SQL

Étape 1 : Créer la table de configuration

La table de configuration est votre plan de contrôle. Elle contient la liste des valeurs de vos processus de travail. Lorsque vous devez ajouter ou supprimer du travail, vous mettez à jour cette table, et non le travail.

Exécutez le code SQL suivant pour créer une markets table dans votre config schéma :

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

Vous pouvez utiliser un notebook Databricks, l’éditeur SQL ou toute tâche SQL pour exécuter cette instruction. Après cette étape, config.markets contient trois lignes, une par marché, chacune avec son code monétaire.

Étape 2 : Écrire le code de traitement

La tâche imbriquée à l’intérieur de la For each tâche s’exécute une fois par ligne. Choisissez une tâche de notebook ou une tâche SQL en fonction de votre logique métier.

Tâche de notebook

Créez un bloc-notes sur un chemin d’accès tel que /Workspace/Users/<username>/process_market. Ce notebook s’exécute une fois à chaque itération de la tâche For each, recevant à chaque exécution une valeur de marché différente.

Ajoutez le code suivant au notebook :

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

Les appels dbutils.widgets.text() définissent des valeurs par défaut pour que vous puissiez exécuter le notebook directement dans votre espace de travail sans le connecter à une tâche. Lorsque le notebook s’exécute en tant que tâche imbriquée à l’intérieur d’une For each tâche, le travail remplace les valeurs par défaut par les valeurs de paramètre réelles pour cette itération.

Appelez dbutils.widgets.text() avant dbutils.widgets.get(). S’il get est appelé avant text, le notebook génère une erreur InputWidgetNotDefined lorsque vous l’exécutez en dehors d’une tâche.

L’utilisation des valeurs par défaut vous permet de tester le notebook en dehors d’un travail, mais notez le compromis : si la For each tâche est mal configurée et ne réussit pas les paramètres, le notebook utilise les valeurs par défaut et réussit silencieusement au lieu d’échouer, ce qui peut rendre la configuration incorrecte plus difficile à détecter.

Tâche SQL

Les tâches SQL prennent en charge les paramètres nommés à l'aide de la syntaxe :param_name. Référencez :market et :currency , dans votre requête, où que vous souhaitiez utiliser les valeurs d’itération :

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

Vous configurez cette requête directement dans l’éditeur de tâches à l’étape 5. La For each tâche transmet les valeurs de l'itération actuelle aux paramètres nommés :market et :currency au moment de l'exécution. Contrairement aux tâches de notebook, les paramètres nommés SQL ne prennent pas en charge les valeurs par défaut : si un paramètre n’est pas passé, la requête échoue avec une erreur de résolution de paramètre. Pour valider ou configurer les paramètres par défaut avant l’exécution de la requête, utilisez plutôt une tâche de notebook.

Étape 3 : Créer le travail

Dans votre espace de travail Databricks, cliquez sur Flux de travail dans la barre latérale, puis sur Créer un travail. Donnez au travail un nom descriptif, par exemple Market Analysis.

Étape 4 : Configurer la tâche de recherche SQL

La tâche SQL exécute votre requête de configuration et rend sa sortie disponible pour les tâches en aval.

  1. Dans l’éditeur de travaux, cliquez sur Ajouter une tâche.

  2. Définissez le nom de la tâche sur read_markets.

  3. Définissez le type sur SQL.

  4. Dans le champ SQL , entrez la requête suivante :

    SELECT market, currency FROM config.markets
    
  5. Définissez SQL warehouse comme un entrepôt dans votre espace de travail.

  6. Cliquez sur Create task.

Lorsque cette tâche s’exécute, Databricks exécute la requête et capture le résultat sous la forme d’un tableau JSON dans tasks.read_markets.output.rows. La sortie de tâche SQL est toujours retournée sous la forme d’un tableau JSON . Aucune configuration supplémentaire n’est requise. La forme générique de cette référence est tasks.<task-name>.output.rows, où <task-name> correspond à la clé de tâche que vous définissez dans l’éditeur de travail. Le résultat se présente ainsi :

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

Étape 5 : Configurer la tâche Pour chaque tâche

La For each tâche lit la sortie SQL et lance une tâche imbriquée exécutée par ligne.

  1. Cliquez sur Ajouter une tâche et définissez Dépend deread_markets.

  2. Définissez le nom de la tâche sur process_markets.

  3. Définissez Type sur Pour chaque.

  4. Dans le champ Entrées , entrez :

    {{tasks.read_markets.output.rows}}
    

    Cela fait référence au tableau de lignes capturé par la tâche SQL.

  5. Définissez Concurrency sur 2 pour permettre à deux itérations de s'exécuter en parallèle. Augmentez cette valeur pour améliorer le débit ou si votre tâche imbriquée prend en charge un parallélisme plus élevé.

  6. Cliquez sur Ajouter une tâche pour effectuer une boucle et configurer la tâche imbriquée en fonction du type que vous avez choisi à l’étape 2 :

Tâche de notebook

  1. Définissez le nom de la tâche sur run_market_analysis_iteration.

  2. Définissez le type sur Notebook.

  3. Définissez le chemin d’accès au bloc-notes que vous avez créé à l’étape 2.

  4. Cliquez sur Paramètres, puis cliquez sur Ajouter pour ajouter chacun des paramètres suivants :

    • Clé : market, Valeur : {{input.market}}
    • Clé : currency, Valeur : {{input.currency}}

    Chaque {{input.<key>}} référence se résout en champ correspondant à partir de l’objet de ligne de l’itération actuelle.

  5. Cliquez sur Create task.

Tâche SQL

  1. Définissez le nom de la tâche sur run_market_analysis_iteration.

  2. Définissez le type sur SQL.

  3. Dans le champ SQL , entrez votre requête avec les paramètres nommés, par exemple :

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. Définissez SQL warehouse comme un entrepôt dans votre espace de travail.

  5. Cliquez sur Paramètres, puis cliquez sur Ajouter pour ajouter chacun des paramètres suivants :

    • Clé : market, Valeur : {{input.market}}
    • Clé : currency, Valeur : {{input.currency}}

    Chaque {{input.<key>}} référence se résout en champ correspondant à partir de l’objet de ligne de l’itération actuelle.

  6. Cliquez sur Create task.

Votre DAG de travail affiche read_markets maintenant le flux vers process_markets, avec la tâche imbriquée visible à l’intérieur du For each nœud.

Étape 6 : Exécuter le travail et vérifier

  1. Cliquez sur Exécuter maintenant pour déclencher le travail.
  2. Sur la page d’exécution du travail, cliquez sur le process_markets nœud pour développer la For each tâche.
  3. La page d’exécution du travail affiche une table des itérations ( une ligne par valeur de marché), chacune affichant son état, son heure de début et sa durée.
  4. Cliquez sur n’importe quelle ligne d’itération pour ouvrir la sortie de l’exécution de la tâche et confirmez qu’elle a reçu la valeur de marché correcte.

Si une itération spécifique échoue, vous ne pouvez réexécuter que cette itération à partir de la page d’exécution du travail sans réexécuter l’intégralité du travail.

Étendre le modèle

Pour ajouter un nouveau marché, insérez une ligne dans la table de configuration :

INSERT INTO config.markets VALUES ('DE', 'EUR');

L’exécution de la tâche suivante inclut automatiquement l’Allemagne, sans qu’aucune modification de configuration de tâche ni édits de notebook ne soient nécessaires.

Ce même modèle fonctionne pour n'importe quel cas d'usage où vous souhaitez que les données pilotent l'itération :

  • Traitement par client : une ligne par ID client ; le notebook applique des transformations spécifiques au client ou livre à des destinations spécifiques au client.
  • Ingestion de table : une ligne par nom de table source ; le bloc-notes lit et ingère chaque table.
  • Traitement du remplissage : une ligne par partition de date ; le notebook retraite les données historiques pour cette partition.
  • Exécution basée sur des indicateurs de fonctionnalité : une ligne pour chaque fonctionnalité ou expérience activée ; le notebook active la logique correspondante.

Pour supprimer un élément du traitement, supprimez sa ligne ou ajoutez une colonne d’indicateur active et filtrez dans la requête SQL :

SELECT market, currency FROM config.markets WHERE active = TRUE

Étapes suivantes