Compartilhar via


Tutorial: Criar vários fluxos com parâmetros diferentes

Um pipeline pode conter vários fluxos quase idênticos, diferindo apenas por alguns parâmetros. Definir esses fluxos explicitamente é propenso a erros, redundante e difícil de manter. A metaprogramação com funções internas Python gera fluxos repetitivos dinamicamente, com cada invocação fornecendo um conjunto diferente de parâmetros.

Visão geral

Metaprogramação em Lakeflow Spark Declarative Pipelines usa funções internas de Python. Como essas funções são avaliadas lentamente pelo runtime do pipeline, você pode encapsular @dp.table decoradores dentro de uma função de fábrica e chamar essa fábrica várias vezes com argumentos diferentes. Cada chamada registra um novo fluxo sem duplicar o código.

Para obter detalhes sobre como usar for loops com pipelines declarativos do Lakeflow Spark, consulte Criar tabelas em um for loop.

Exemplo: tempos de resposta do corpo de bombeiros

O exemplo a seguir usa o conjunto de dados interno do Corpo de Bombeiros para localizar os bairros com os tempos de resposta de emergência mais rápidos para cada tipo de chamada. Sem metaprogramação, você deve escrever definições de tabela quase idênticas para cada tipo de chamada (Alarmes, Incêndio de Estrutura, Incidente Médico). Com a metaprogramação, uma única função de fábrica gera todas elas.

Etapa 1: Definir a tabela de ingestão bruta

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Etapa 2: Definir a função de fábrica de fluxo

A generate_tables função de fábrica registra duas tabelas para cada tipo de chamada: uma tabela de chamadas filtrada e uma tabela de tempo de resposta classificada. Ambos são criados como funções internas, decoradas usando @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Etapa 3: Invocar a fábrica e definir a tabela de resumo

Chame a fábrica uma vez para cada tipo de chamada e defina uma tabela de resumo que uniu os resultados para localizar os bairros que aparecem com mais frequência em todas as categorias.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Depois de executar esse pipeline, você criará um conjunto de tabelas semelhantes, como este grafo:

Grafo das tabelas geradas por este tutorial.

Conceitos principais

  • As funções internas são registradas lentamente: o @dp.table decorador não executa a função imediatamente. Ele registra a função com o tempo de execução do pipeline, que resolve o grafo completo de fluxo de dados antes do início da execução.
  • Parâmetros de captura de fechamentos: cada função interna fecha sobre os parâmetros passados para a fábrica (call_table, response_table, , filter), de modo que cada fluxo registrado usa seu próprio conjunto isolado de valores.
  • Listas de tabelas dinâmicas: usar uma lista como all_tables acompanhar nomes de tabela gerados programaticamente torna simples referenciá-los posteriormente (por exemplo, em uma união ou junção).

Recursos adicionais