Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Los patrones de concentración y difusión son comunes en la ingeniería de datos moderna para crear flujos de datos escalables y confiables. En esta página se describen los patrones y se muestra cómo implementarlos en canalizaciones declarativas de Spark de Lakeflow.
¿Qué son fan-in y fan-out?
Fan-in es un patrón arquitectónico en el que los datos de varios orígenes se ingieren y procesan dentro de una sola tubería.
Los orígenes pueden incluir:
- Secuencias de eventos en tiempo real (por ejemplo, Kafka y Kinesis)
- Almacenamiento en la nube (por ejemplo, S3, ADLS y Google Cloud Storage)
- Bases de datos relacionales (por ejemplo, PostgreSQL, MySQL y Snowflake)
- Dispositivos IoT (por ejemplo, sensores, registros y API)
Al consolidar diversos flujos de datos en una sola capa de procesamiento, el fan-in permite una coherente transformación, desduplicación y enriquecimiento de datos antes de que los datos se transfieran aguas abajo.
Fan-out adopta un enfoque de uno a varios, dirigiendo un flujo de datos procesado hacia varios destinos.
Los destinos pueden incluir:
- Tablas delta para almacenamiento estructurado
- Sistemas de alertas en tiempo real para la detección de anomalías
- Modelos de aprendizaje automático para el análisis predictivo
- Almacenamientos de datos para informes y análisis
- Colas de mensajes para la comunicación asincrónica y el procesamiento desacoplado
Este patrón garantiza que cada sistema de bajada reciba datos en el formato necesario, lo que permite a las organizaciones integrar datos de streaming en varias aplicaciones empresariales.
En la práctica, las canalizaciones suelen combinar ambos patrones. Por ejemplo:
- Una empresa recopila datos de actividad de usuario de varias aplicaciones, sitios web y dispositivos móviles (ventilador).
- Los datos procesados se almacenan en Delta Lake para su análisis histórico, mientras que las alertas en tiempo real se activan ante una actividad inusual (fan-out).
Implementar fan-in con flujos de añadido
Las canalizaciones de distribución ramificada combinan varios flujos de datos en un destino unificado. Tradicionalmente, esto requiere consultas de unión complejas y puntos de comprobación manuales. Los flujos de anexión simplifican esto al permitir que varios flujos de datos se alimenten directamente en una sola tabla de streaming sin uniones explícitas ni lógica compleja. Cada origen se administra de forma independiente, lo que permite la ingesta y las actualizaciones incrementales de datos.
Por ejemplo, use flujos de anexión para consolidar varios temas de Kafka o flujos de datos regionales en una tabla de destino unificada.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("all_topics")
# Kafka stream from topic1
@dp.append_flow(target="all_topics")
def topic1():
return spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,...") \
.option("subscribe", "topic1") \
.load()
# Kafka stream from topic2
@dp.append_flow(target="all_topics")
def topic2():
return spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,...") \
.option("subscribe", "topic2") \
.load()
SQL
CREATE OR REFRESH STREAMING TABLE all_topics;
CREATE FLOW
topic1
AS INSERT INTO
all_topics BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
all_topics BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Implementación del ventilador
Las tuberías de ventilador distribuyen datos de un origen a varias salidas. Las canalizaciones declarativas de Spark de Lakeflow admiten tres enfoques en función del caso de uso.
Uso de bucles for para la lógica generalizada
Si la lógica de ETL es idéntica en varios destinos, use bucles de Python para generar dinámicamente varias tablas a través de bucles con parámetros. Esto evita la codificación repetitiva y simplifica el escalado de canalizaciones a través de la configuración.
Importante
Cada flujo o tabla generado procesa todo el conjunto de datos de origen de forma independiente. En el caso de los orígenes con límites de capacidad de lectura o rendimiento compartido, como Kafka, esto puede afectar significativamente al rendimiento. Evalúe cuidadosamente el enfoque para estos orígenes antes de usarlo.
regions = ["US", "EU", "APAC"]
for region in regions:
@dp.materialized_view(name=f"orders_{region.lower()}_filtered")
def filtered_orders(region_filter=region):
return spark.read.table("combined_orders").filter(f"region = '{region_filter}'")
Uso de flujos independientes para la lógica específica del destino
Cuando las transformaciones de ETL varían significativamente por destino, implemente flujos de datos independientes. Este enfoque tiene un control preciso y un rendimiento optimizado adaptado a cada caso de uso.
from pyspark import pipelines as dp
# Grouped output
@dp.materialized_view(name="orders_sink")
def region_orders():
df = spark.read.table("combined_orders").groupBy("region").count()
# Add additional logic here
return df
# BI materialized view
@dp.materialized_view(name="orders_bi_materialized")
def orders_bi():
return spark.read.table("combined_orders").select("order_id", "amount", "region")
# ML feature table
@dp.materialized_view(name="orders_ml_features")
def orders_ml():
return (
spark.read.table("combined_orders")
.withColumn("high_value_order", col("amount") > 1000)
.select("order_id", "high_value_order", "region")
)
Uso de ForEachBatch para el enrutamiento personalizado
Importante
foreach_batch_sink está disponible en versión preliminar pública a través del canal de versión preliminar de canalizaciones declarativas de Spark de Lakeflow. Consulte channel en Configuraciones de canalización.
foreach_batch_sink aplica lógica personalizada a cada micro-lote, lo que permite transformaciones complejas, combinaciones o el enrutamiento a varios destinos, incluidos aquellos que carecen de soporte de transmisión integrado, como los receptores JDBC.
Importante
Cada lote ejecuta varias operaciones de escritura de forma independiente. Los errores de una operación no revierten automáticamente las escrituras exitosas previas. Esto puede provocar datos parciales o incoherentes a través de objetivos, especialmente cuando se procesan fuentes compartidas como Kafka. Diseñe las canalizaciones con un control cuidadoso de errores y pruebas exhaustivas. Consulte Uso de ForEachBatch para escribir en receptores de datos arbitrarios en canalizaciones.
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(batch_df, batch_id):
# Write to Delta table
batch_df.write.format("delta").mode("append").saveAsTable("my_catalog.my_schema.my_delta_table")
# Write to JSON files
batch_df.write.format("json").mode("append").save("/Volumes/path/to/json_target")
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/data/incoming/events")
)
Patrones ForEachBatch comunes
foreach_batch_sink admite varios patrones. Algunos patrones comunes incluyen:
Flujo único al receptor de varios destinos: Un único
append_flowlee de un origen de streaming y enruta los datos a unforeach_batch_sink. El sumidero controla la escritura en múltiples destinos (por ejemplo, Delta, JSON y sistemas externos). Esto es ideal para casos de uso sencillos de varias salidas con lógica de transformación compartida.Varios flujos a un receptor unificado: varios
append_floworígenes (por ejemplo, directorios diferentes, formatos, temas de Kafka o API externas) se combinan en un únicoforeach_batch_sink. Esto centraliza la lógica de transformación común, la administración de resultados y el control de errores. Dado que solo es necesario mantener un punto de control, este enfoque reduce significativamente la complejidad de la coordinación. Resulta especialmente útil al controlar colas de mensajes como Kafka o API externas.Un flujo a un sumidero (muchos pares independientes): cada
append_flowtiene unforeach_batch_sinkdedicado, estableciendo relaciones claras y aisladas entre orígenes individuales y sus destinos. Esto es ideal para canalizaciones con muchos flujos independientes que requieren lógica de procesamiento única, solución de problemas simplificada y control de errores aislados.
En la práctica, estos enfoques suelen complementarse entre sí. Por ejemplo, use bucles para generar múltiples flujos de adición dinámicamente para escenarios de fan-in a gran escala y, a continuación, redistribuir los resultados mediante bucles o foreach_batch_sink para fan-out.
procedimientos recomendados
- Los flujos de anexión requieren que los esquemas de origen se alineen con la tabla de streaming de destino para evitar errores de procesamiento. Use las expectativas del esquema de canalizaciones declarativas de Spark de Lakeflow para detectar y controlar las excepciones de forma proactiva, lo que garantiza la coherencia del esquema en toda la canalización.
- Mantenga la lógica de bucle for bien definida y sencilla.
- Asigne un nombre a cada flujo y tabla claramente para mantener la legibilidad.
- Supervise el uso de recursos para escalar de forma eficaz y evitar cuellos de botella de rendimiento.
- Al escribir en colas de mensajes, utilice un único
foreach_batch_sinkque consolide todos los flujos de entrada en una solaappend_flow. Esto simplifica el estado posterior y la administración de puntos de control.
Limitaciones
- Es posible que la interfaz de usuario de linaje de canalizaciones declarativas de Spark de Lakeflow no muestre métricas y metadatos a nivel de flujo para las nuevas fuentes de flujo de anexión.
- Expanda en lugar de reducir la lista de valores usados en un bucle for. Si se omite un conjunto de datos definido previamente en ejecuciones de canalización posteriores, se quita automáticamente del esquema de destino, lo que provoca una pérdida de datos no deseada.