Uso de una tabla de control para controlar un For each trabajo

Es posible que tenga que integrar desde muchas fuentes. Cuando esa lista cambia, codificarla de forma rígida en la configuración del trabajo significa cambiar el código y volver a implementarlo. Use metadatos para solucionarlo almacenando la lista de orígenes en una tabla que se lee y se usa en tiempo de ejecución. Agregue un origen como una nueva fila y la siguiente ejecución del trabajo la recoge sin cambios en el propio trabajo.

En este tutorial se muestra cómo crear un trabajo mediante este enfoque. Una tarea SQL lee la tabla de control y una For each tarea recorre en iteración cada fila en paralelo.

Cómo funciona

El patrón usa tres tipos de tareas conectados conjuntamente en secuencia:

tarea Tipo Qué hace
read_markets SQL Consulta una tabla de configuración y captura el resultado como una matriz de filas
process_markets For Each Itera sobre {{tasks.read_markets.output.rows}}, ejecutando la tarea anidada una vez por fila
run_market_analysis_iteration Cuaderno o SQL (anidado dentro de Para cada uno) Se ejecuta una vez por fila, mediante el uso de valores de fila pasados como parámetros para ejecutar la lógica de negocios.

La salida de la tarea SQL, una matriz JSON de objetos de fila, fluye directamente al campo For each de la tarea utilizando la referencia {{tasks.read_markets.output.rows}} de valor dinámico. A For each continuación, la tarea pasa cada fila a la tarea anidada como parámetros, disponibles como {{input.market}} y {{input.currency}}.

Prerrequisitos

  • Un área de trabajo de Databricks con permiso para crear trabajos y cuadernos
  • Permiso para crear tablas en el catálogo de Unity
  • Un esquema de catálogo de Unity donde puede crear la tabla de configuración (por ejemplo, config)
  • Una instancia de SQL Warehouse para ejecutar las tareas de SQL

Paso 1: Crear la tabla de configuración

La tabla de configuración es el plano de control. Contiene la lista de valores que procesa el trabajo. Cuando necesite agregar o quitar trabajo, actualice esta tabla, no el trabajo.

Ejecute el siguiente código SQL para crear una tabla markets en el esquema config.

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

Puede usar un cuaderno de Databricks, el editor de SQL o cualquier tarea de SQL para ejecutar esta instrucción. Después de este paso, config.markets contiene tres filas, una por mercado, cada una con su código de moneda.

Paso 2: Escribir el código de procesamiento

La tarea anidada dentro de la tarea For each se ejecuta una vez por cada fila. Elija una tarea de cuaderno o una tarea DE SQL en función de la lógica de negocios.

Tarea de cuaderno

Cree un nuevo cuaderno en una ruta como /Workspace/Users/<username>/process_market. Este cuaderno se ejecuta una vez por iteración de la For each tarea, recibiendo un valor de mercado diferente cada vez.

Agregue el código siguiente al cuaderno:

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

Las dbutils.widgets.text() llamadas establecen valores predeterminados para que pueda ejecutar el cuaderno directamente en el área de trabajo sin conectarlo a un trabajo. Cuando el cuaderno se ejecuta como una tarea anidada dentro de una For each tarea, el trabajo invalida los valores predeterminados con los valores de parámetro reales para esa iteración.

Llame a dbutils.widgets.text() antes de dbutils.widgets.get(). Si get se llama antes de text, el notebook produce un InputWidgetNotDefined error al ejecutarlo fuera de un trabajo.

El uso de valores predeterminados le permite probar el cuaderno fuera de un trabajo, pero tenga en cuenta la compensación: si la For each tarea está mal configurada y no pasa parámetros, el cuaderno usa los valores predeterminados y tiene éxito silenciosamente en lugar de fallar, lo que puede dificultar la detección de la configuración incorrecta.

Tarea SQL

Las tareas de SQL admiten parámetros con nombre mediante la :param_name sintaxis . Haga referencia :market y :currency en la consulta donde quiera usar los valores de iteración:

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

Esta consulta se configura directamente en el editor de tareas del paso 5. La For each tarea pasa los valores de la iteración actual a los parámetros :market y :currency nombrados en tiempo de ejecución. A diferencia de las tareas del cuaderno, los parámetros con nombre de SQL no admiten valores predeterminados; si no se pasa un parámetro, la consulta produce un error de resolución de parámetros. Para validar o establecer parámetros predeterminados antes de que se ejecute la consulta, use en su lugar una tarea de cuaderno.

Paso 3: Crear el trabajo

En el área de trabajo de Databricks, haga clic en Flujos de trabajo en la barra lateral y, a continuación, haga clic en Crear trabajo. Asigne al trabajo un nombre descriptivo, por ejemplo Market Analysis.

Paso 4: Configurar la tarea de búsqueda de SQL

La tarea SQL ejecuta la consulta de configuración y hace que su salida esté disponible para las tareas posteriores.

  1. En el editor de trabajos, haga clic en Agregar tarea.

  2. Establezca Nombre de tarea en read_markets.

  3. Establezca Tipo en SQL.

  4. En el campo SQL , escriba la siguiente consulta:

    SELECT market, currency FROM config.markets
    
  5. Establezca SQL Warehouse para un almacén en su área de trabajo.

  6. Haga clic en Create task (Crear tarea).

Cuando se ejecuta esta tarea, Databricks ejecuta la consulta y captura el resultado como una matriz JSON en tasks.read_markets.output.rows. La salida de la tarea SQL siempre se devuelve como una matriz JSON; no se requiere ninguna configuración adicional. La forma genérica de esta referencia es tasks.<task-name>.output.rows, donde <task-name> coincide con la clave de tarea que estableció en el editor de trabajos. La salida es similar a esta:

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

Paso 5: Configurar para cada tarea

La For each tarea lee la salida de SQL e inicia una ejecución de tarea anidada por fila.

  1. Haga clic en Agregar tarea y establezca Depende de en read_markets.

  2. Establezca Nombre de tarea en process_markets.

  3. Establezca Tipo en Para cada uno.

  4. En el campo Entradas , escriba:

    {{tasks.read_markets.output.rows}}
    

    Esto hace referencia a la matriz de filas capturada por la tarea SQL.

  5. Establezca Simultaneidad en 2 para permitir que dos iteraciones se ejecuten en paralelo. Aumente este valor para mejorar el rendimiento de procesamiento, o si la tarea anidada admite un mayor grado de paralelismo.

  6. Haga clic en Agregar tarea al bucle y configure la tarea anidada según el tipo elegido en el paso 2:

Tarea de cuaderno

  1. Establezca Nombre de tarea en run_market_analysis_iteration.

  2. Establezca Tipo en Notebook.

  3. Establezca Ruta a la ruta del cuaderno que creó en el paso 2.

  4. Haga clic en Parámetros y, a continuación, haga clic en Agregar para agregar cada uno de los parámetros siguientes:

    • Clave: market, Valor: {{input.market}}
    • Clave: currency, Valor: {{input.currency}}

    Cada {{input.<key>}} referencia se refiere al campo correspondiente del objeto fila de la iteración actual.

  5. Haga clic en Create task (Crear tarea).

Tarea SQL

  1. Establezca Nombre de tarea en run_market_analysis_iteration.

  2. Establezca Tipo en SQL.

  3. En el campo SQL , escriba la consulta con los parámetros con nombre, por ejemplo:

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. Establezca SQL Warehouse para un almacén en su área de trabajo.

  5. Haga clic en Parámetros y, a continuación, haga clic en Agregar para agregar cada uno de los parámetros siguientes:

    • Clave: market, Valor: {{input.market}}
    • Clave: currency, Valor: {{input.currency}}

    Cada {{input.<key>}} referencia se refiere al campo correspondiente del objeto fila de la iteración actual.

  6. Haga clic en Create task (Crear tarea).

El DAG de tu tarea ahora muestra read_markets fluyendo hacia process_markets, con la tarea anidada visible dentro del nodo For each.

Paso 6: Ejecución del trabajo y comprobación

  1. Haga clic en Ejecutar ahora para desencadenar el trabajo.
  2. En la página de ejecución del trabajo, haga clic en el nodo process_markets para ampliar la tarea For each.
  3. La página de ejecución del trabajo muestra una tabla de iteraciones ( una fila por valor de mercado) cada una de las cuales muestra su estado, hora de inicio y duración.
  4. Haga clic en cualquier fila de iteración para abrir el resultado de la ejecución de la tarea y confirme que recibió el valor de mercado correcto.

Si se produce un error en una iteración específica, solo puede volver a ejecutar esa iteración desde la página de ejecución del trabajo sin volver a ejecutar todo el trabajo.

Extender el patrón

Para agregar un nuevo mercado, inserte una fila en la tabla de configuración:

INSERT INTO config.markets VALUES ('DE', 'EUR');

La siguiente ejecución del trabajo incluye automáticamente Alemania, sin cambios de configuración de trabajo ni modificaciones de cuaderno necesarias.

Este mismo patrón funciona para cualquier caso de uso en el que desee que los datos impulsen la iteración:

  • Procesamiento por cliente: una fila por identificador de cliente; el cuaderno aplica transformaciones específicas del cliente o entrega a destinos específicos del cliente.
  • Ingesta de tabla: una fila por cada nombre de tabla de origen; el cuaderno lee e ingiere cada tabla.
  • Procesamiento de reposición: una fila por partición de fecha; el cuaderno vuelve a procesar los datos históricos de esa partición.
  • Ejecución controlada por marcas de características: una fila por característica o experimento habilitado; el cuaderno activa la lógica correspondiente.

Para quitar un elemento del procesamiento, elimine su fila o agregue una active columna de marca y filtre en la consulta SQL:

SELECT market, currency FROM config.markets WHERE active = TRUE

Pasos siguientes