Partilhar via


Use uma mesa de controlo para gerir um For each trabalho

Pode precisar de ingerir de várias fontes. Quando essa lista muda, codifica-la fixamente na configuração do job significa alterar o código e depois voltar a implementar. Use metadados para resolver isto, armazenando a lista de fontes numa tabela que é lida e utilizada em tempo de execução. Adicionar uma fonte como uma nova linha e a próxima execução do trabalho processa-a sem alterações ao próprio trabalho.

Este tutorial mostra-lhe como construir um trabalho usando esta abordagem. Uma tarefa SQL lê a tabela de controlo, e uma tarefa For each itera por cada linha em paralelo.

Como funciona

O padrão utiliza três tipos de tarefas ligados em sequência:

Tarefa Tipo O que faz
read_markets SQL Consulta uma tabela de configuração e captura o resultado como um array de linhas
process_markets Para cada Itera sobre {{tasks.read_markets.output.rows}}, executando a tarefa incorporada uma vez por cada linha
run_market_analysis_iteration Notebook ou SQL (aninhado dentro Para cada um) Executa-se uma vez por linha, utilizando os valores da linha fornecidos como parâmetros para executar a lógica de negócio.

A saída da tarefa SQL—um array JSON de objetos de linha—flui diretamente para o campo For each da tarefa usando a referência dinâmica de valor {{tasks.read_markets.output.rows}}. A For each tarefa passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.

Pré-requisitos

  • Um espaço de trabalho Databricks com permissão para criar trabalhos e cadernos
  • Permissão para criar tabelas no Unity Catalog
  • Um esquema do Unity Catalog onde podes criar a tabela de configuração (por exemplo, config)
  • Um armazém SQL para executar as tarefas SQL

Passo 1: Criar a tabela de configuração

A tabela de configuração é o teu plano de controlo. Contém a lista de valores que o seu trabalho processa. Quando precisas de adicionar ou remover trabalho, atualizas esta tabela — não o trabalho.

Execute o seguinte SQL para criar uma markets tabela no seu config esquema:

CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
  ('NL', 'EUR'),
  ('UK', 'GBP'),
  ('US', 'USD')
AS t(market, currency);

Pode usar um bloco de notas Databricks, o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com o seu código de moeda.

Passo 2: Escrever o código de processamento

A tarefa aninhada dentro da For each tarefa executa-se uma vez por linha. Escolhe uma tarefa de caderno ou SQL, dependendo da lógica do teu negócio.

Tarefa do caderno

Crie um novo caderno num caminho como /Workspace/Users/<username>/process_market. Este notebook executa uma vez por cada iteração da tarefa For each, recebendo um valor de mercado diferente a cada vez.

Adicione o seguinte código ao caderno:

# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
    f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

As dbutils.widgets.text() chamadas definem valores predefinidos para que possas executar o caderno diretamente no teu espaço de trabalho sem o ligar a um trabalho. Quando o notebook é executado como uma tarefa aninhada dentro de uma For each tarefa, o processo substitui os valores predefinidos pelos valores reais dos parâmetros para essa iteração.

Ligue dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o notebook gera um erro de InputWidgetNotDefined quando o executa fora de uma tarefa.

Usar os predefinidos permite-lhe testar o caderno fora de um trabalho, mas note-se a contrapartida: se a For each tarefa estiver mal configurada e não passar parâmetros, o caderno usa os valores definidos e tem sucesso silenciosamente em vez de falhar — o que pode dificultar a deteção da má configuração.

Tarefa SQL

As tarefas SQL suportam parâmetros nomeados usando a :param_name sintaxe. Faça referência :market e :currency na sua consulta onde quiser usar os valores da iteração:

SELECT *
FROM sales.transactions
WHERE market = :market
  AND currency_code = :currency

Configura esta consulta diretamente no editor de tarefas no Passo 5. A For each tarefa passa os valores da iteração atual para os parâmetros nomeados :market e :currency em tempo de execução. Ao contrário das tarefas do caderno, os parâmetros nomeados SQL não suportam valores predefinidos — se um parâmetro não for passado, a consulta falha com um erro de resolução de parâmetro. Para validar ou definir parâmetros por defeito antes da consulta ser executada, use antes uma tarefa de caderno.

Passo 3: Criar o trabalho

No seu espaço de trabalho Databricks, clique em Fluxos de Trabalho na barra lateral e depois clique em Criar trabalho. Dê ao trabalho um nome descritivo, por Market Analysisexemplo.

Passo 4: Configurar a tarefa de consulta SQL

A tarefa SQL executa a tua consulta de configuração e disponibiliza a sua saída para tarefas posteriores.

  1. No editor de tarefas, clique em Adicionar tarefa.

  2. Defina o nome da Tarefa para read_markets.

  3. Definir o Tipo como SQL.

  4. No campo SQL , introduza a seguinte consulta:

    SELECT market, currency FROM config.markets
    
  5. Definir SQL warehouse como um armazém no teu espaço de trabalho.

  6. Clique em Criar tarefa.

Quando esta tarefa é executada, o Databricks executa a consulta e captura o resultado como um array JSON em tasks.read_markets.output.rows. A saída da tarefa SQL é sempre devolvida como um array JSON — não é necessária qualquer configuração adicional. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à chave de tarefa que definiste no editor de tarefas. O resultado tem o seguinte aspeto:

[
  { "market": "NL", "currency": "EUR" },
  { "market": "UK", "currency": "GBP" },
  { "market": "US", "currency": "USD" }
]

Passo 5: Configurar o Para cada tarefa

A tarefa For each lê a saída SQL e inicia uma execução de tarefa encadeada por linha.

  1. Clique em Adicionar tarefa e defina o campo Dependente de para read_markets.

  2. Defina o nome da Tarefa para process_markets.

  3. Defina Tipo para Para cada.

  4. No campo Inputs , introduza:

    {{tasks.read_markets.output.rows}}
    

    Isto refere-se ao array de linhas capturado pela tarefa SQL.

  5. Defina Concorrência para 2 permitir que duas iterações corram em paralelo. Aumente este valor para melhorar o throughput ou se a sua tarefa aninhada suportar maior paralelismo.

  6. Clique Adicionar uma tarefa a iterar e configure a tarefa aninhada com base no tipo escolhido no Passo 2:

Tarefa do caderno

  1. Defina o nome da Tarefa para run_market_analysis_iteration.

  2. Definir Tipo como Portátil.

  3. Define o Caminho para o caminho do caderno que criaste no Passo 2.

  4. Clique em Parâmetros, depois clique em Adicionar para adicionar cada um dos seguintes parâmetros:

    • Chave: market, Valor: {{input.market}}
    • Chave: currency, Valor: {{input.currency}}

    Cada referência {{input.<key>}} resolve para o campo correspondente do objeto de linha da iteração atual.

  5. Clique em Criar tarefa.

Tarefa SQL

  1. Defina o nome da Tarefa para run_market_analysis_iteration.

  2. Definir o Tipo como SQL.

  3. No campo SQL , introduza a sua consulta com os parâmetros nomeados, por exemplo:

    SELECT *
    FROM sales.transactions
    WHERE market = :market
      AND currency_code = :currency
    
  4. Definir SQL warehouse como um armazém no teu espaço de trabalho.

  5. Clique em Parâmetros, depois clique em Adicionar para adicionar cada um dos seguintes parâmetros:

    • Chave: market, Valor: {{input.market}}
    • Chave: currency, Valor: {{input.currency}}

    Cada referência {{input.<key>}} resolve para o campo correspondente do objeto de linha da iteração atual.

  6. Clique em Criar tarefa.

O seu DAG de trabalho agora mostra read_markets a fluir para process_markets, com a tarefa aninhada visível dentro do For each nó.

Passo 6: Execute o trabalho e verifique

  1. Clique em Executar agora para ativar a tarefa.
  2. Na página de execução do trabalho, clique no process_markets nó para expandir a For each tarefa.
  3. A página de execução do trabalho mostra uma tabela de iterações — uma linha por valor de mercado — cada uma mostrando o seu estado, hora de início e duração.
  4. Clique em qualquer linha de iteração para abrir a saída da tarefa e confirmar que recebeu o valor de mercado correto.

Se uma iteração específica falhar, só podes reexecutar essa iteração a partir da página de execução do trabalho sem repetir o trabalho inteiro.

Estende o padrão

Para adicionar um novo mercado, insira uma linha na tabela de configuração:

INSERT INTO config.markets VALUES ('DE', 'EUR');

A próxima execução inclui automaticamente a Alemanha, sem alterações na configuração do trabalho ou alterações no caderno.

Este mesmo padrão funciona em qualquer caso de uso em que queiras que os dados impulsionem a iteração:

  • Processamento por cliente: Uma linha por ID de cliente; O Notebook aplica transformações específicas para o cliente ou entrega a destinos específicos para o cliente.
  • Ingestão de tabelas: Uma linha por nome de tabela de origem; o caderno lê e ingere cada tabela.
  • Processamento de preenchimento: Uma linha por partição de data; o notebook reprocessa os dados históricos dessa partição.
  • Execução orientada por flag de funcionalidades: Uma linha por funcionalidade ou experiência ativada; o notebook ativa a lógica correspondente.

Para remover um item do processamento, elimine a sua linha ou adicione uma coluna de sinalização active e filtre na consulta SQL.

SELECT market, currency FROM config.markets WHERE active = TRUE

Passos seguintes