Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Un pipeline peut contenir plusieurs flux presque identiques, qui diffèrent uniquement par quelques paramètres. La définition explicite de ces flux est sujette à des erreurs, redondantes et difficiles à gérer. La métaprogrammation avec les fonctions internes de Python génère des flux répétitifs dynamiquement, à chaque appel fournissant un ensemble de paramètres différent.
Aperçu
La métaprogrammation dans les pipelines déclaratifs Lakeflow Spark utilise les fonctions internes de Python. Étant donné que ces fonctions sont évaluées de manière différée par l'environnement d'exécution du pipeline, vous pouvez envelopper les décorateurs @dp.table à l’intérieur d’une fonction usine et appeler cette fonction plusieurs fois avec différents arguments. Chaque appel inscrit un nouveau flux sans dupliquer du code.
Pour plus d'informations sur l'utilisation des boucles for avec des pipelines déclaratifs Lakeflow Spark, voir Créer des tables dans une boucle for.
Exemple : temps de réponse du service d’incendie
L’exemple suivant utilise le jeu de données du service d’incendie intégré pour rechercher les quartiers avec les temps de réponse d’urgence les plus rapides pour chaque type d’appel. Sans métagrammation, vous devez écrire des définitions de table presque identiques pour chaque type d’appel (alarmes, incendie de structure, incident médical). Avec la métaprogrammation, une seule fonction de fabrique génère toutes ces fonctions.
Étape 1 : Définir la table d’ingestion brute
import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *
@dp.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
Étape 2 : Définir la fonction de fabrique de flux
La generate_tables fonction de fabrique inscrit deux tables pour chaque type d’appel : une table d’appels filtrée et une table de temps de réponse classée. Les deux sont créés en tant que fonctions internes décorées avec @dp.table.
all_tables = []
def generate_tables(call_table, response_table, filter):
@dp.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
@dp.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time"
)
def create_response_table():
return spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM {call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
all_tables.append(response_table)
Étape 3 : Appeler la fabrique et définir la table récapitulative
Appelez la fabrique une fois pour chaque type d’appel, puis définissez une table récapitulative qui unione les résultats pour trouver les quartiers qui apparaissent le plus souvent dans toutes les catégories.
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dp.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dp.read(t) for t in all_tables]
unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
Après avoir exécuté ce pipeline, vous allez créer un ensemble de tables similaires, comme ce graphique :
Concepts clés
-
Les fonctions internes sont enregistrées de manière différée : le
@dp.tabledécorateur n’exécute pas immédiatement la fonction. Il inscrit la fonction auprès du runtime de pipeline, qui résout le graphique de flux de données complet avant le début de l’exécution. -
Les fermetures capturent les paramètres : chaque fonction interne capture les paramètres passés à la fabrique (
call_table,response_table,filter), de sorte que chaque flux inscrit utilise son propre ensemble isolé de valeurs. -
Listes de tables dynamiques : utiliser une liste telle que
all_tablespour suivre les noms de tables générés de manière programmatique facilite leur référence ultérieure (par exemple, dans une union ou une jointure).