Zelfstudie: Meerdere stromen maken met verschillende parameters

Een pijplijn kan meerdere stromen bevatten die bijna identiek zijn, die slechts door een paar parameters verschillen. Het expliciet definiëren van deze stromen is foutgevoelig, redundant en moeilijk te onderhouden. Met metaprogrammering in Python met behulp van innerlijke functies worden herhalende processen dynamisch aangemaakt, waarbij elke aanroep een andere set parameters gebruikt.

Overzicht

Metaprogrammering in Lakeflow Spark-declaratieve pijplijnen maakt gebruik van Python binnenste functies. Omdat deze functies uitgesteld worden geëvalueerd door de pipeline-omgeving, kunt u @dp.table decorators binnen een fabrieksfunctie verpakken en die fabrieksfunctie meerdere keren aanroepen met verschillende argumenten. Elke aanroep registreert een nieuwe stroom zonder code te dupliceren.

Zie for voor meer informatie over het gebruik van for lussen met declaratieve Lakeflow Spark-pijplijnen.

Voorbeeld: reactietijden van brandweer

In het volgende voorbeeld wordt de ingebouwde gegevensset van de brandweer gebruikt om de buurten te vinden met de snelste reactietijden voor noodgevallen voor elk oproeptype. Zonder metaprogrammering moet u bijna identieke tabeldefinities schrijven voor elk oproeptype (Alarmen, Structuurbrand, Medisch Incident). Met metaprogrammering genereert een enkele fabrieksfunctie ze allemaal.

Stap 1: De tabel met ruwe gegevensinvoer definiëren

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Stap 2: De functie voor de verwerkingsstroom definiëren

De generate_tables factory-functie registreert twee tabellen voor elk aanroeptype: een gefilterde aanroeptabel en een gerangschikte tabel met reactietijd. Beide worden gecreëerd als binnenste functies versierd met @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Stap 3: De factory aanroepen en de samenvattingstabel definiëren

Roep de fabriek eenmaal aan voor elk oproeptype en definieer vervolgens een samenvattingstabel die de resultaten samenvoegt om de buurten te vinden die het vaakst in alle categorieën worden weergegeven.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Nadat u deze pijplijn hebt uitgevoerd, maakt u een set vergelijkbare tabellen, zoals deze grafiek:

Grafiek van de tabellen die door deze zelfstudie zijn gegenereerd.

belangrijke concepten

  • Binnenste functies worden lazily geregistreerd: de @dp.table decorator voert de functie niet onmiddellijk uit. Hiermee wordt de functie geregistreerd bij het runtime-systeem van de pijplijn, dat de volledige gegevensstroomgrafiek oplost voordat de uitvoering begint.
  • Sluitingen leggen parameters vast: Elke binnenste functie wordt gesloten over de parameters die worden doorgegeven aan de fabriek (call_table, response_table, filter), zodat elke geregistreerde stroom een eigen geïsoleerde set waarden gebruikt.
  • Dynamische tabellijsten: Door een lijst te gebruiken zoals all_tables om programmatisch gegenereerde tabelnamen bij te houden, kunt u deze later eenvoudig raadplegen (bijvoorbeeld in een UNION of join).

Aanvullende bronnen