Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Use uma mesa de controlo para gerir um
Pode precisar de ingerir de várias fontes. Quando essa lista muda, codifica-la fixamente na configuração do job significa alterar o código e depois voltar a implementar. Use metadados para resolver isto, armazenando a lista de fontes numa tabela que é lida e utilizada em tempo de execução. Adicionar uma fonte como uma nova linha e a próxima execução do trabalho processa-a sem alterações ao próprio trabalho.
Este tutorial mostra-lhe como construir um trabalho usando esta abordagem. Uma tarefa SQL lê a tabela de controlo, e uma tarefa For each itera por cada linha em paralelo.
Como funciona
O padrão utiliza três tipos de tarefas ligados em sequência:
| Tarefa | Tipo | O que faz |
|---|---|---|
read_markets |
SQL | Consulta uma tabela de configuração e captura o resultado como um array de linhas |
process_markets |
Para cada | Itera sobre {{tasks.read_markets.output.rows}}, executando a tarefa incorporada uma vez por cada linha |
run_market_analysis_iteration |
Notebook ou SQL (aninhado dentro Para cada um) | Executa-se uma vez por linha, utilizando os valores da linha fornecidos como parâmetros para executar a lógica de negócio. |
A saída da tarefa SQL—um array JSON de objetos de linha—flui diretamente para o campo For each da tarefa usando a referência dinâmica de valor {{tasks.read_markets.output.rows}}. A For each tarefa passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.
Pré-requisitos
- Um espaço de trabalho Databricks com permissão para criar trabalhos e cadernos
- Permissão para criar tabelas no Unity Catalog
- Um esquema do Unity Catalog onde podes criar a tabela de configuração (por exemplo,
config) - Um armazém SQL para executar as tarefas SQL
Passo 1: Criar a tabela de configuração
A tabela de configuração é o teu plano de controlo. Contém a lista de valores que o seu trabalho processa. Quando precisas de adicionar ou remover trabalho, atualizas esta tabela — não o trabalho.
Execute o seguinte SQL para criar uma markets tabela no seu config esquema:
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Pode usar um bloco de notas Databricks, o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com o seu código de moeda.
Passo 2: Escrever o código de processamento
A tarefa aninhada dentro da For each tarefa executa-se uma vez por linha. Escolhe uma tarefa de caderno ou SQL, dependendo da lógica do teu negócio.
Tarefa do caderno
Crie um novo caderno num caminho como /Workspace/Users/<username>/process_market. Este notebook executa uma vez por cada iteração da tarefa For each, recebendo um valor de mercado diferente a cada vez.
Adicione o seguinte código ao caderno:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
As dbutils.widgets.text() chamadas definem valores predefinidos para que possas executar o caderno diretamente no teu espaço de trabalho sem o ligar a um trabalho. Quando o notebook é executado como uma tarefa aninhada dentro de uma For each tarefa, o processo substitui os valores predefinidos pelos valores reais dos parâmetros para essa iteração.
Ligue dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o notebook gera um erro de InputWidgetNotDefined quando o executa fora de uma tarefa.
Usar os predefinidos permite-lhe testar o caderno fora de um trabalho, mas note-se a contrapartida: se a For each tarefa estiver mal configurada e não passar parâmetros, o caderno usa os valores definidos e tem sucesso silenciosamente em vez de falhar — o que pode dificultar a deteção da má configuração.
Tarefa SQL
As tarefas SQL suportam parâmetros nomeados usando a :param_name sintaxe. Faça referência :market e :currency na sua consulta onde quiser usar os valores da iteração:
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Configura esta consulta diretamente no editor de tarefas no Passo 5. A For each tarefa passa os valores da iteração atual para os parâmetros nomeados :market e :currency em tempo de execução. Ao contrário das tarefas do caderno, os parâmetros nomeados SQL não suportam valores predefinidos — se um parâmetro não for passado, a consulta falha com um erro de resolução de parâmetro. Para validar ou definir parâmetros por defeito antes da consulta ser executada, use antes uma tarefa de caderno.
Passo 3: Criar o trabalho
No seu espaço de trabalho Databricks, clique em Fluxos de Trabalho na barra lateral e depois clique em Criar trabalho. Dê ao trabalho um nome descritivo, por Market Analysisexemplo.
Passo 4: Configurar a tarefa de consulta SQL
A tarefa SQL executa a tua consulta de configuração e disponibiliza a sua saída para tarefas posteriores.
No editor de tarefas, clique em Adicionar tarefa.
Defina o nome da Tarefa para
read_markets.Definir o Tipo como SQL.
No campo SQL , introduza a seguinte consulta:
SELECT market, currency FROM config.marketsDefinir SQL warehouse como um armazém no teu espaço de trabalho.
Clique em Criar tarefa.
Quando esta tarefa é executada, o Databricks executa a consulta e captura o resultado como um array JSON em tasks.read_markets.output.rows. A saída da tarefa SQL é sempre devolvida como um array JSON — não é necessária qualquer configuração adicional. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à chave de tarefa que definiste no editor de tarefas. O resultado tem o seguinte aspeto:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
Passo 5: Configurar o Para cada tarefa
A tarefa For each lê a saída SQL e inicia uma execução de tarefa encadeada por linha.
Clique em Adicionar tarefa e defina o campo Dependente de para
read_markets.Defina o nome da Tarefa para
process_markets.Defina Tipo para Para cada.
No campo Inputs , introduza:
{{tasks.read_markets.output.rows}}Isto refere-se ao array de linhas capturado pela tarefa SQL.
Defina Concorrência para
2permitir que duas iterações corram em paralelo. Aumente este valor para melhorar o throughput ou se a sua tarefa aninhada suportar maior paralelismo.Clique Adicionar uma tarefa a iterar e configure a tarefa aninhada com base no tipo escolhido no Passo 2:
Tarefa do caderno
Defina o nome da Tarefa para
run_market_analysis_iteration.Definir Tipo como Portátil.
Define o Caminho para o caminho do caderno que criaste no Passo 2.
Clique em Parâmetros, depois clique em Adicionar para adicionar cada um dos seguintes parâmetros:
-
Chave:
market, Valor:{{input.market}} -
Chave:
currency, Valor:{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual.-
Chave:
Clique em Criar tarefa.
Tarefa SQL
Defina o nome da Tarefa para
run_market_analysis_iteration.Definir o Tipo como SQL.
No campo SQL , introduza a sua consulta com os parâmetros nomeados, por exemplo:
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currencyDefinir SQL warehouse como um armazém no teu espaço de trabalho.
Clique em Parâmetros, depois clique em Adicionar para adicionar cada um dos seguintes parâmetros:
-
Chave:
market, Valor:{{input.market}} -
Chave:
currency, Valor:{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual.-
Chave:
Clique em Criar tarefa.
O seu DAG de trabalho agora mostra read_markets a fluir para process_markets, com a tarefa aninhada visível dentro do For each nó.
Passo 6: Execute o trabalho e verifique
- Clique em Executar agora para ativar a tarefa.
- Na página de execução do trabalho, clique no
process_marketsnó para expandir aFor eachtarefa. - A página de execução do trabalho mostra uma tabela de iterações — uma linha por valor de mercado — cada uma mostrando o seu estado, hora de início e duração.
- Clique em qualquer linha de iteração para abrir a saída da tarefa e confirmar que recebeu o valor de mercado correto.
Se uma iteração específica falhar, só podes reexecutar essa iteração a partir da página de execução do trabalho sem repetir o trabalho inteiro.
Estende o padrão
Para adicionar um novo mercado, insira uma linha na tabela de configuração:
INSERT INTO config.markets VALUES ('DE', 'EUR');
A próxima execução inclui automaticamente a Alemanha, sem alterações na configuração do trabalho ou alterações no caderno.
Este mesmo padrão funciona em qualquer caso de uso em que queiras que os dados impulsionem a iteração:
- Processamento por cliente: Uma linha por ID de cliente; O Notebook aplica transformações específicas para o cliente ou entrega a destinos específicos para o cliente.
- Ingestão de tabelas: Uma linha por nome de tabela de origem; o caderno lê e ingere cada tabela.
- Processamento de preenchimento: Uma linha por partição de data; o notebook reprocessa os dados históricos dessa partição.
- Execução orientada por flag de funcionalidades: Uma linha por funcionalidade ou experiência ativada; o notebook ativa a lógica correspondente.
Para remover um item do processamento, elimine a sua linha ou adicione uma coluna de sinalização active e filtre na consulta SQL.
SELECT market, currency FROM config.markets WHERE active = TRUE
Passos seguintes
-
Use uma
For eachtarefa para executar outra tarefa num ciclo — Referência completa para configuraçãoFor eachde tarefas, incluindo tipos de parâmetros e opções de concorrência -
Use uma tabela de consulta para arrays de parâmetros grandes numa
For eachtarefa — Como lidar com arrays de parâmetros grandes que excedam o limite de valor de tarefa de 48 KB - Aceder a valores de parâmetros de uma tarefa — Todos os métodos para aceder a valores de parâmetros em notebooks, scripts de Python e tarefas SQL