Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Usar uma tabela de controle para executar um
Talvez seja necessário ingerir de várias fontes. Quando essa lista é alterada, codificar-a na configuração do trabalho significa alterar o código e reimplantar. Use metadados para resolver isso armazenando a lista de fontes em uma tabela que é lida e usada em tempo de execução. Adicione uma origem como uma nova linha e a próxima execução do trabalho a incorpora automaticamente, sem necessidade de alterações no trabalho.
Este tutorial mostra como criar um trabalho usando essa abordagem. Uma tarefa SQL lê a tabela de controle, e uma tarefa For each itera sobre cada linha paralelamente.
Como funciona
O padrão usa três tipos de tarefa conectados em sequência:
| Tarefa | Tipo | O que faz |
|---|---|---|
read_markets |
SQL | Consulta uma tabela de configuração e captura o resultado como uma matriz de linhas |
process_markets |
Para cada | Itera sobre {{tasks.read_markets.output.rows}}, executando a tarefa aninhada uma vez por linha |
run_market_analysis_iteration |
Notebook ou SQL (aninhado dentro para cada um) | É executado uma vez por linha, usando valores de linha passados como parâmetros para executar sua lógica de negócios |
A saída da tarefa SQL — uma matriz JSON de objetos de linha — flui diretamente para o For each campo Entradas da tarefa usando a referência {{tasks.read_markets.output.rows}}de valor dinâmico. A tarefa For each então passa cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.
Pré-requisitos
- Um workspace do Databricks com permissão para criar trabalhos e notebooks
- Permissão para criar tabelas no Catálogo do Unity
- Um esquema do Catálogo do Unity em que você pode criar a tabela de configuração (por exemplo,
config) - Um SQL Warehouse para executar as tarefas do SQL
Etapa 1: Criar a tabela de configuração
A tabela de configuração é o plano de controle. Ele contém a lista de valores que os processos do seu trabalho utilizam. Quando você precisar adicionar ou remover o trabalho, atualize esta tabela , não o trabalho.
Execute o SEGUINTE SQL para criar uma markets tabela em seu config esquema:
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Você pode usar um notebook do Databricks, o editor do SQL ou qualquer tarefa SQL para executar essa instrução. Após esta etapa, config.markets contém três linhas, uma por mercado, cada uma com seu código de moeda.
Etapa 2: escrever o código de processamento
A tarefa aninhada dentro da For each tarefa é executada uma vez por linha. Escolha uma tarefa de bloco de anotações ou uma tarefa SQL, dependendo da lógica de negócios.
Tarefa do notebook
Crie um novo notebook em um caminho como /Workspace/Users/<username>/process_market. Este notebook é executado uma vez por iteração da tarefa For each, recebendo um valor de mercado diferente a cada execução.
Adicione o seguinte código ao notebook:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
As dbutils.widgets.text() chamadas definem valores padrão para que você possa executar o bloco de anotações diretamente em seu workspace sem conectá-lo a um trabalho. Quando o notebook é executado como uma tarefa aninhada dentro de uma For each tarefa, a execução substitui os padrões com os valores reais dos parâmetros para essa iteração.
Chamar dbutils.widgets.text() antes dbutils.widgets.get(). Se get for chamado antes do text, o bloco de anotações gerará um erro InputWidgetNotDefined ao executá-lo fora de um trabalho.
Usar padrões permite testar o notebook fora de um trabalho, mas observe o compromisso: se a For each tarefa estiver configurada incorretamente e não passar parâmetros, o notebook usará os padrões e terá êxito silenciosamente em vez de falhar — o que pode dificultar a detecção do erro.
Tarefa SQL
As tarefas SQL dão suporte a parâmetros nomeados usando a :param_name sintaxe. Referência :market e :currency na consulta onde você quiser usar os valores de iteração:
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Configure essa consulta diretamente no editor de tarefas na Etapa 5. A For each tarefa passa os valores da iteração atual para os parâmetros nomeados :market e :currency em tempo de execução. Ao contrário das tarefas do notebook, os parâmetros nomeados por SQL não dão suporte a valores padrão. Se um parâmetro não for passado, a consulta falhará com um erro de resolução de parâmetro. Para validar ou definir parâmetros padrão antes que a consulta seja executada, use uma tarefa em notebook.
Etapa 3: Criar o trabalho
No workspace do Databricks, clique em Fluxos de Trabalho na barra lateral e clique em Criar trabalho. Dê ao trabalho um nome descritivo, por exemplo Market Analysis.
Etapa 4: Configurar a tarefa de pesquisa do SQL
A tarefa SQL executa sua consulta de configuração e disponibiliza sua saída para tarefas downstream.
No editor de trabalhos, clique em Adicionar tarefa.
Definir o nome da tarefa como
read_markets.Defina o Tipo como SQL.
No campo SQL , insira a seguinte consulta:
SELECT market, currency FROM config.marketsDefina o SQL Warehouse para um armazém em seu workspace.
Clique em Criar tarefa.
Quando essa tarefa é executada, o Databricks executa a consulta e captura o resultado como uma matriz JSON em tasks.read_markets.output.rows. A saída da tarefa SQL sempre é retornada como uma matriz JSON. Nenhuma configuração adicional é necessária. A forma genérica dessa referência é tasks.<task-name>.output.rows, em que <task-name> corresponde à chave de tarefa definida no editor de trabalho. A saída tem esta aparência:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
Etapa 5: Configurar a tarefa For each
A For each tarefa lê a saída do SQL e inicia uma execução de tarefa aninhada por linha.
Clique em Adicionar tarefa e defina Depende de para
read_markets.Definir o nome da tarefa como
process_markets.Configure Tipo para Para cada.
No campo Entradas, insira :
{{tasks.read_markets.output.rows}}Isso faz referência à matriz de linhas capturada pela tarefa SQL.
Defina a Simultaneidade para
2permitir que duas iterações são executadas em paralelo. Aumente esse valor para melhorar a taxa de transferência ou se sua tarefa aninhada der suporte a paralelismo mais alto.Clique em Adicionar uma tarefa para repetir e configurar a tarefa aninhada com base no tipo escolhido na Etapa 2:
Tarefa do notebook
Definir o nome da tarefa como
run_market_analysis_iteration.Defina Tipo como Notebook.
Defina Caminho para o caminho do bloco de anotações que você criou na Etapa 2.
Clique em Parâmetros e clique em Adicionar para adicionar cada um dos seguintes parâmetros:
-
Chave:
market, Valor:{{input.market}} -
Chave:
currency, Valor:{{input.currency}}
Cada referência
{{input.<key>}}é resolvida para o campo correspondente do objeto de linha da iteração atual.-
Chave:
Clique em Criar tarefa.
Tarefa SQL
Definir o nome da tarefa como
run_market_analysis_iteration.Defina o Tipo como SQL.
No campo SQL , insira sua consulta com os parâmetros nomeados, por exemplo:
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currencyDefina o SQL Warehouse para um armazém em seu workspace.
Clique em Parâmetros e clique em Adicionar para adicionar cada um dos seguintes parâmetros:
-
Chave:
market, Valor:{{input.market}} -
Chave:
currency, Valor:{{input.currency}}
Cada
{{input.<key>}}referência se resolve para o campo correspondente do objeto de linhas da iteração atual.-
Chave:
Clique em Criar tarefa.
Seu DAG de trabalho agora mostra read_markets fluindo para process_markets, com a tarefa aninhada visível dentro do nó For each.
Etapa 6: Executar o trabalho e verificar
- Clique em Executar agora para disparar o trabalho.
- Na página de execução do trabalho, clique no
process_marketsnó para expandir aFor eachtarefa. - A página de execução do trabalho mostra uma tabela de iterações – uma linha por valor de mercado – cada uma mostrando seu status, hora de início e duração.
- Clique em qualquer linha de iteração para abrir a saída da execução da tarefa e confirmar se ela recebeu o valor de mercado correto.
Se uma iteração específica falhar, você poderá executar novamente somente essa iteração da página de execução do trabalho sem executar novamente todo o trabalho.
Estender o padrão
Para adicionar um novo mercado, insira uma linha na tabela de configuração:
INSERT INTO config.markets VALUES ('DE', 'EUR');
A próxima execução de trabalho inclui automaticamente a Alemanha, sem nenhuma alteração de configuração de trabalho ou edições de notebook necessárias.
Esse mesmo padrão funciona para qualquer caso de uso em que você deseja que os dados conduzam a iteração:
- Processamento por cliente: uma linha para cada ID de cliente; o notebook aplica transformações específicas para cada cliente ou entrega a destinos específicos de cada cliente.
- Ingestão de tabela: uma linha por nome de tabela de origem; o notebook lê e ingere cada tabela.
- Processamento de backfill: uma linha por partição de data; o notebook reprocessa dados históricos para essa partição.
- Execução controlada por sinalizador de recurso: uma linha por recurso ou experimento habilitado; o notebook ativa a lógica correspondente.
Para remover um item do processamento, exclua sua linha ou adicione uma active coluna de sinalizador e filtre na consulta SQL:
SELECT market, currency FROM config.markets WHERE active = TRUE
Próximas Etapas
-
Usar uma
For eachtarefa para executar outra tarefa em um loop – Referência completa para configurarFor eachtarefas, incluindo tipos de parâmetro e opções de simultaneidade -
Usar uma tabela de pesquisa para grandes matrizes de parâmetros em uma
For eachtarefa – Como lidar com grandes matrizes de parâmetros que excedem o limite de valor da tarefa de 48 KB - Acessar valores de parâmetro de uma tarefa — Todos os métodos para acessar valores de parâmetro em notebooks, scripts Python e tarefas SQL