Tutorial: Creación de varios flujos con distintos parámetros

Una canalización puede contener varios flujos casi idénticos, que solo difieren en unos pocos parámetros. Definir estos flujos explícitamente es propenso a errores, redundante y difícil de mantener. La metaprogramación con funciones internas de Python genera flujos repetitivos dinámicamente, proporcionando cada invocación un conjunto diferente de parámetros.

Visión general

La metaprogramación en las canalizaciones declarativas de Spark de Lakeflow utiliza funciones internas de Python. Puesto que estas funciones se evalúan perezosamente por el tiempo de ejecución de la canalización, puede encapsular @dp.table decoradores dentro de una función factoría y llamar a esa función varias veces con argumentos diferentes. Cada llamada registra un nuevo flujo sin duplicar el código.

Para más información sobre el uso for de bucles con canalizaciones declarativas de Spark de Lakeflow, consulte Creación de tablas en un for bucle.

Ejemplo: tiempos de respuesta del departamento de bomberos

En el ejemplo siguiente se usa el conjunto de datos integrado del departamento de bomberos para buscar los vecindarios con los tiempos de respuesta de emergencia más rápidos para cada tipo de llamada. Sin metaprogramación, debe escribir definiciones de tabla casi idénticas para cada tipo de llamada (Alarmas, Fuego de estructura, Incidente médico). Con la metaprogramación, una sola función de fábrica genera todas ellas.

Paso 1: Definir la tabla de ingesta sin procesar

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Paso 2: Definición de la función de generador de flujo

La generate_tables función factory registra dos tablas para cada tipo de llamada: una tabla de llamadas filtrada y una tabla de tiempo de respuesta clasificada. Ambos se crean como funciones internas decoradas con @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Paso 3: Invocación del generador y definición de la tabla de resumen

Llame a la fábrica una vez para cada tipo de llamada y, a continuación, defina una tabla de resumen que unione los resultados para encontrar los barrios que aparecen con más frecuencia en todas las categorías.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Después de ejecutar esta canalización, creará un conjunto de tablas similares a las de este diagrama:

Gráfico de las tablas generadas por este tutorial.

Conceptos clave

  • Las funciones internas se registran perezosamente: el @dp.table decorador no ejecuta la función inmediatamente. Registra la función con el tiempo de ejecución de la canalización, que resuelve el gráfico de flujo de datos completo antes de que comience la ejecución.
  • <;c0>Los cierres capturan parámetros: cada función interna encierra los parámetros pasados al generador (<;c1 />, <;c2 />, <;c3 />), por lo que cada flujo registrado utiliza su propio conjunto aislado de valores.
  • Listas de tablas dinámicas: el uso de una lista como all_tables para realizar un seguimiento de los nombres de tabla generados mediante programación facilita su referencia más adelante (por ejemplo, en una unión o combinación).

Recursos adicionales