Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Använda en kontrolltabell för att köra ett
Du kan behöva mata in från många källor. När listan ändras innebär hårdkodning av den i jobbkonfigurationen att kod ändras och distribueras om. Använd metadata för att åtgärda detta genom att lagra listan över källor i en tabell som läss och används vid körning. Lägg till en källa som en ny rad och nästa jobbkörning hämtar den utan ändringar i själva jobbet.
Den här handledningen instruerar dig hur du bygger ett jobb med den här metoden. En SQL-uppgift läser kontrolltabellen och en For each uppgift itererar över varje rad parallellt.
Så här fungerar det
Mönstret använder tre aktivitetstyper kopplade i följd:
| Uppgift | Type | Vad det gör |
|---|---|---|
read_markets |
SQL | Frågar en konfigurationstabell och avbildar resultatet som en radmatris |
process_markets |
För varje | Itererar över {{tasks.read_markets.output.rows}} och kör det kapslade uppdraget en gång per rad |
run_market_analysis_iteration |
Notebook-fil eller SQL (kapslad inuti För varje) | Körs en gång per rad med hjälp av radvärden som skickas som parametrar för att köra affärslogik |
SQL-aktivitetens utdata – en JSON-matris med radobjekt – flödar direkt till For each aktivitetens indatafält med hjälp av referensen {{tasks.read_markets.output.rows}}för dynamiskt värde . Uppgiften For each skickar sedan varje rad till den kapslade uppgiften som parametrar, tillgängliga som {{input.market}} och {{input.currency}}.
Förutsättningar
- En Databricks-arbetsyta med behörighet att skapa jobb och notebook-filer
- Behörighet att skapa tabeller i Unity Catalog
- Ett Unity Catalog-schema där du kan skapa konfigurationstabellen (till exempel
config) - Ett SQL-lager för att köra SQL-uppgifter
Steg 1: Skapa konfigurationstabellen
Konfigurationstabellen är ditt kontrollplan. Den innehåller listan med värden som dina arbetsprocesser. När du behöver lägga till eller ta bort arbete uppdaterar du den här tabellen – inte jobbet.
Kör följande SQL för att skapa en markets tabell i schemat config :
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Du kan använda en Databricks-notebook-fil, SQL-redigeraren eller valfri SQL-uppgift för att köra den här instruktionen. Efter det här steget config.markets innehåller tre rader, en per marknad, var och en med sin valutakod.
Steg 2: Skriv bearbetningskoden
Den kapslade aktiviteten i For each aktiviteten körs en gång per rad. Välj en notebook-uppgift eller en SQL-uppgift beroende på din affärslogik.
Notebook-uppgift
Skapa en ny notebook-fil på en sökväg, till exempel /Workspace/Users/<username>/process_market. Den här anteckningsboken körs en gång per iteration av uppgiften For each och får ett annat marknadsvärde varje gång.
Lägg till följande kod i notebook-filen:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
Anropen dbutils.widgets.text() anger standardvärden så att du kan köra anteckningsboken direkt på din arbetsyta utan att ansluta den till ett jobb. När notebook-filen körs som en kapslad uppgift i ett For each-jobb, åsidosätter jobbet standardvärdena med de faktiska parametervärdena för den iterationen.
Ring dbutils.widgets.text() före dbutils.widgets.get(). Om get anropas före text genereras ett InputWidgetNotDefined fel i notebooken när du kör den utanför ett jobb.
Med standardvärden kan du testa notebook-filen utanför ett jobb, men observera kompromissen For each : om uppgiften är felkonfigurerad och inte skickar parametrar använder notebook-filen standardvärdena och lyckas tyst i stället för att misslyckas – vilket kan göra felkonfigurationen svårare att identifiera.
SQL-uppgift
SQL-uppgifter stöder namngivna parametrar med hjälp av syntaxen :param_name . Referera till :market och :currency i din fråga där du vill använda iterationsvärdena.
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Du konfigurerar den här frågan direkt i aktivitetsredigeraren i steg 5. Uppgiften For each skickar den aktuella iterationens värden till de namngivna parametrarna :market och :currency vid körning. Till skillnad från notebook-uppgifter stöder INTE SQL-namngivna parametrar standardvärden – om en parameter inte skickas misslyckas frågan med ett parametermatchningsfel. För att validera eller ställa in standardvärden för parametrar innan frågan körs, använd en notebook-uppgift i stället.
Steg 3: Skapa jobbet
I databricks-arbetsytan klickar du på Arbetsflöden i sidofältet och klickar sedan på Skapa jobb. Ge jobbet ett beskrivande namn, till exempel Market Analysis.
Steg 4: Konfigurera SQL-sökningsaktiviteten
SQL-aktiviteten kör konfigurationsfrågan och gör dess utdata tillgängliga för underordnade aktiviteter.
I jobbredigeraren klickar du på Lägg till uppgift.
Ange Aktivitetsnamn till
read_markets.Ange Typ till SQL.
I SQL-fältet anger du följande fråga:
SELECT market, currency FROM config.marketsStäll in SQL Warehouse till ett datavaruhus på din arbetsyta.
Klicka på Skapa uppgift.
När den här aktiviteten körs kör Databricks frågan och samlar in resultatet som en JSON-matris i tasks.read_markets.output.rows. SQL-aktivitetsutdata returneras alltid som en JSON-matris – ingen ytterligare konfiguration krävs. Den allmänna formen för den här referensen är tasks.<task-name>.output.rows, där <task-name> matchar den aktivitetsnyckel som du angav i jobbredigeraren. Utdata ser ut så här:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
Steg 5: Konfigurera för varje uppgift
Uppgiften For each läser SQL-utdata och startar en kapslad aktivitetskörning per rad.
Klicka på Lägg till aktivitet och ange Beror på till
read_markets.Ange Aktivitetsnamn till
process_markets.Ange Typ till För var och en.
I fältet Indata anger du:
{{tasks.read_markets.output.rows}}Detta refererar till radmatrisen som fångas av SQL-aktiviteten.
Ställ in Concurrency så att
2två iterationer kan köras parallellt. Öka det här värdet för att förbättra dataflödet eller om din kapslade uppgift stöder högre parallellitet.Klicka på Lägg till en aktivitet för att loopa över och konfigurera den kapslade aktiviteten baserat på vilken typ du valde i steg 2:
Notebook-uppgift
Ange Aktivitetsnamn till
run_market_analysis_iteration.Ställ in Typ på Notebook.
Ange Sökväg till sökvägen till anteckningsboken som du skapade i steg 2.
Klicka på Parametrar och klicka sedan på Lägg till för att lägga till var och en av följande parametrar:
-
Nyckel:
market, Värde:{{input.market}} -
Nyckel:
currency, Värde:{{input.currency}}
Varje
{{input.<key>}}referens matchar motsvarande fält från den aktuella iterationens radobjekt.-
Nyckel:
Klicka på Skapa uppgift.
SQL-uppgift
Ange Aktivitetsnamn till
run_market_analysis_iteration.Ange Typ till SQL.
I SQL-fältet anger du din fråga med de namngivna parametrarna, till exempel:
SELECT * FROM sales.transactions WHERE market = :market AND currency_code = :currencyStäll in SQL Warehouse till ett datavaruhus på din arbetsyta.
Klicka på Parametrar och klicka sedan på Lägg till för att lägga till var och en av följande parametrar:
-
Nyckel:
market, Värde:{{input.market}} -
Nyckel:
currency, Värde:{{input.currency}}
Varje
{{input.<key>}}referens matchar motsvarande fält från den aktuella iterationens radobjekt.-
Nyckel:
Klicka på Skapa uppgift.
Jobbet DAG visar read_markets nu flödande in i process_markets, med den kapslade uppgiften synlig inuti For each noden.
Steg 6: Kör jobbet och verifiera
- Klicka på Kör nu för att utlösa jobbet.
- På jobbkörningssidan klickar du på
process_marketsnoden för att expanderaFor eachaktiviteten. - Jobbkörningssidan visar en tabell med iterationer – en rad per marknadsvärde – som var och en visar dess status, starttid och varaktighet.
- Klicka på en iterationsrad för att öppna aktivitetskörningens utdata och bekräfta att den har fått rätt marknadsvärde.
Om en specifik iteration misslyckas kan du bara köra iterationen igen från jobbkörningssidan utan att köra hela jobbet igen.
Utöka mönstret
Om du vill lägga till en ny marknad infogar du en rad i konfigurationstabellen:
INSERT INTO config.markets VALUES ('DE', 'EUR');
Nästa jobbkörning inkluderar automatiskt Tyskland, utan att det krävs några ändringar i jobbkonfigurationen eller redigering av notebook-filer.
Samma mönster fungerar för alla användningsfall där du vill att data ska driva iteration:
- Bearbetning per kund: En rad per kund-ID; anteckningsboken tillämpar kundspecifika transformeringar eller levererar till kundspecifika destinationer.
- Tabellinmatning: En rad per källtabellnamn; notebook-filen läser och matar in varje tabell.
- Återfyllnadsbearbetning: En rad per datumpartition; anteckningsboken bearbetar om historiska datan för partitionen.
- Funktionsflaggdriven körning: En rad per aktiverad funktion eller experiment; notebooken initierar motsvarande logik.
Om du vill ta bort ett objekt från bearbetningen tar du bort dess rad eller lägger till en active flaggkolumn och filtrerar i SQL-frågan:
SELECT market, currency FROM config.markets WHERE active = TRUE
Nästa steg
-
Använd en
For eachaktivitet för att köra en annan aktivitet i en loop – Fullständig referens för attFor eachkonfigurera uppgifter, inklusive parametertyper och samtidighetsalternativ -
Använd en uppslagstabell för stora parametermatriser i en
For eachuppgift – Hantera stora parametermatriser som överskrider aktivitetsvärdegränsen på 48 KB - Åtkomst till parametervärden från en uppgift – Alla metoder för att komma åt parametervärden i notebook-filer, Python-skript och SQL-uppgifter