Tutoriel : Créer plusieurs flux avec différents paramètres

Un pipeline peut contenir plusieurs flux presque identiques, qui diffèrent uniquement par quelques paramètres. La définition explicite de ces flux est sujette à des erreurs, redondantes et difficiles à gérer. La métaprogrammation avec les fonctions internes de Python génère des flux répétitifs dynamiquement, à chaque appel fournissant un ensemble de paramètres différent.

Aperçu

La métaprogrammation dans les pipelines déclaratifs Lakeflow Spark utilise les fonctions internes de Python. Étant donné que ces fonctions sont évaluées de manière différée par l'environnement d'exécution du pipeline, vous pouvez envelopper les décorateurs @dp.table à l’intérieur d’une fonction usine et appeler cette fonction plusieurs fois avec différents arguments. Chaque appel inscrit un nouveau flux sans dupliquer du code.

Pour plus d'informations sur l'utilisation des boucles for avec des pipelines déclaratifs Lakeflow Spark, voir Créer des tables dans une boucle for.

Exemple : temps de réponse du service d’incendie

L’exemple suivant utilise le jeu de données du service d’incendie intégré pour rechercher les quartiers avec les temps de réponse d’urgence les plus rapides pour chaque type d’appel. Sans métagrammation, vous devez écrire des définitions de table presque identiques pour chaque type d’appel (alarmes, incendie de structure, incident médical). Avec la métaprogrammation, une seule fonction de fabrique génère toutes ces fonctions.

Étape 1 : Définir la table d’ingestion brute

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Étape 2 : Définir la fonction de fabrique de flux

La generate_tables fonction de fabrique inscrit deux tables pour chaque type d’appel : une table d’appels filtrée et une table de temps de réponse classée. Les deux sont créés en tant que fonctions internes décorées avec @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Étape 3 : Appeler la fabrique et définir la table récapitulative

Appelez la fabrique une fois pour chaque type d’appel, puis définissez une table récapitulative qui unione les résultats pour trouver les quartiers qui apparaissent le plus souvent dans toutes les catégories.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Après avoir exécuté ce pipeline, vous allez créer un ensemble de tables similaires, comme ce graphique :

Graphique des tables générées par ce didacticiel.

Concepts clés

  • Les fonctions internes sont enregistrées de manière différée : le @dp.table décorateur n’exécute pas immédiatement la fonction. Il inscrit la fonction auprès du runtime de pipeline, qui résout le graphique de flux de données complet avant le début de l’exécution.
  • Les fermetures capturent les paramètres : chaque fonction interne capture les paramètres passés à la fabrique (call_table, response_table, filter), de sorte que chaque flux inscrit utilise son propre ensemble isolé de valeurs.
  • Listes de tables dynamiques : utiliser une liste telle que all_tables pour suivre les noms de tables générés de manière programmatique facilite leur référence ultérieure (par exemple, dans une union ou une jointure).

Ressources additionnelles