Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een pijplijn kan meerdere stromen bevatten die bijna identiek zijn, die slechts door een paar parameters verschillen. Het expliciet definiëren van deze stromen is foutgevoelig, redundant en moeilijk te onderhouden. Met metaprogrammering in Python met behulp van innerlijke functies worden herhalende processen dynamisch aangemaakt, waarbij elke aanroep een andere set parameters gebruikt.
Overzicht
Metaprogrammering in Lakeflow Spark-declaratieve pijplijnen maakt gebruik van Python binnenste functies. Omdat deze functies uitgesteld worden geëvalueerd door de pipeline-omgeving, kunt u @dp.table decorators binnen een fabrieksfunctie verpakken en die fabrieksfunctie meerdere keren aanroepen met verschillende argumenten. Elke aanroep registreert een nieuwe stroom zonder code te dupliceren.
Zie for voor meer informatie over het gebruik van for lussen met declaratieve Lakeflow Spark-pijplijnen.
Voorbeeld: reactietijden van brandweer
In het volgende voorbeeld wordt de ingebouwde gegevensset van de brandweer gebruikt om de buurten te vinden met de snelste reactietijden voor noodgevallen voor elk oproeptype. Zonder metaprogrammering moet u bijna identieke tabeldefinities schrijven voor elk oproeptype (Alarmen, Structuurbrand, Medisch Incident). Met metaprogrammering genereert een enkele fabrieksfunctie ze allemaal.
Stap 1: De tabel met ruwe gegevensinvoer definiëren
import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *
@dp.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
Stap 2: De functie voor de verwerkingsstroom definiëren
De generate_tables factory-functie registreert twee tabellen voor elk aanroeptype: een gefilterde aanroeptabel en een gerangschikte tabel met reactietijd. Beide worden gecreëerd als binnenste functies versierd met @dp.table.
all_tables = []
def generate_tables(call_table, response_table, filter):
@dp.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
@dp.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time"
)
def create_response_table():
return spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM {call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
all_tables.append(response_table)
Stap 3: De factory aanroepen en de samenvattingstabel definiëren
Roep de fabriek eenmaal aan voor elk oproeptype en definieer vervolgens een samenvattingstabel die de resultaten samenvoegt om de buurten te vinden die het vaakst in alle categorieën worden weergegeven.
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dp.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dp.read(t) for t in all_tables]
unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
Nadat u deze pijplijn hebt uitgevoerd, maakt u een set vergelijkbare tabellen, zoals deze grafiek:
belangrijke concepten
-
Binnenste functies worden lazily geregistreerd: de
@dp.tabledecorator voert de functie niet onmiddellijk uit. Hiermee wordt de functie geregistreerd bij het runtime-systeem van de pijplijn, dat de volledige gegevensstroomgrafiek oplost voordat de uitvoering begint. -
Sluitingen leggen parameters vast: Elke binnenste functie wordt gesloten over de parameters die worden doorgegeven aan de fabriek (
call_table,response_table,filter), zodat elke geregistreerde stroom een eigen geïsoleerde set waarden gebruikt. -
Dynamische tabellijsten: Door een lijst te gebruiken zoals
all_tablesom programmatisch gegenereerde tabelnamen bij te houden, kunt u deze later eenvoudig raadplegen (bijvoorbeeld in een UNION of join).