Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le streaming structuré est un moteur de traitement de flux évolutif à tolérance de panne basé sur Spark. Il traite un flux de données en direct comme une table à laquelle de nouvelles lignes sont ajoutées en continu. Structured Streaming prend en charge les sources de fichiers intégrées telles que CSV, JSON, ORC et Parquet, ainsi que les services de messagerie tels que Kafka et Azure Event Hubs.
Cet article traite de la configuration d’une source de diffusion en continu telle qu’Azure Event Hubs, l’ingestion de données de streaming dans une table Lakehouse Delta, l’optimisation des performances d’écriture avec le partitionnement et le traitement par lots d’événements et l’exécution de travaux de streaming de manière fiable en production.
Configurer une source de diffusion en continu
Pour diffuser des données dans un lakehouse, commencez par configurer une connexion à votre source de diffusion en continu. Azure Event Hubs est un choix courant. Utilisez le connecteur Azure Event Hubs pour Apache Spark pour connecter votre application Spark à Azure Event Hubs.
Une configuration Event Hubs de base nécessite le nom de l’espace de noms Event Hubs, le nom du hub, le nom de la clé d’accès partagé et le groupe de consommateurs.
Un groupe de consommateurs est une vue de l’ensemble d’un Event Hub. Les groupes de consommateurs permettent à plusieurs applications consommatrices d’avoir chacun une vue distincte du flux d’événements et de lire le flux indépendamment à leur rythme et avec leurs propres décalages.
Les partitions dans Event Hubs vous permettent de traiter de grands volumes d’événements en parallèle. Un processeur unique a une capacité limitée pour la gestion des événements par seconde, tandis que plusieurs processeurs peuvent fonctionner en parallèle entre les partitions.
Si trop de partitions sont utilisées avec un faible taux d’ingestion, les lecteurs de partition traitent avec une petite partie des données, provoquant un traitement non optimal. Le nombre idéal de partitions dépend du taux de traitement souhaité. Lorsque vous augmentez le nombre d’unités de débit dans votre espace de noms, vous souhaiterez peut-être des partitions supplémentaires pour permettre aux lecteurs simultanés d’atteindre leur débit maximal.
Testez le meilleur nombre de partitions pour votre scénario de débit. Les scénarios avec un débit élevé utilisent généralement 32 partitions ou plus.
Table Delta en tant que récepteur de streaming
Delta Lake est une couche de stockage open source qui fournit des transactions ACID (atomicité, cohérence, isolation et durabilité) en plus du stockage data lake. Dans Fabric Data Engineering, Delta Lake prend en charge les upserts, le compactage des données, le voyage dans le temps, l’évolution du schéma et le stockage en format ouvert.
Avec delta comme format de sortie dans writeStream, les données de streaming circulent directement dans une table Delta. L’exemple suivant lit à partir d’Event Hubs, analyse le corps du message et écrit dans une table Delta :
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = (
spark.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
Schema = StructType([
StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True),
])
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.toTable("deltaeventstable")
)
Dans le code, format("delta") définit Delta comme format de sortie, outputMode("append") écrit uniquement de nouvelles lignes dans la table et toTable("deltaeventstable") conserve les données diffusées en continu dans une table Delta managée.
Optimiser les performances de diffusion en continu
Une fois que l’ingestion de streaming de base fonctionne, vous pouvez améliorer le débit et l’organisation des fichiers avec les techniques d’optimisation décrites dans les sections suivantes.
Partitionner les données pour les écritures
Pour optimiser le débit, partitionnez efficacement vos données. Le partitionnement améliore le débit d’écriture et les performances des requêtes en aval. Vous pouvez partitionner des données en mémoire, sur disque ou les deux.
Sur disque : permet partitionBy() d’organiser les données en sous-répertoires en fonction des valeurs de colonne. Choisissez des colonnes avec une bonne cardinalité qui produisent des fichiers de taille optimale. Évitez les colonnes qui créent trop de partitions minuscules ou trop peu de grandes.
En mémoire — utilisez repartition() ou coalesce() pour distribuer des données entre les nœuds de travail avant d’écrire :
-
repartition()augmente ou diminue les partitions avec un remaniement complet, en équilibrant les données uniformément. -
coalesce()diminue uniquement les partitions, ce qui réduit le déplacement des données.
La combinaison des deux approches fonctionne bien pour les scénarios à débit élevé. L’exemple suivant fractionne les données en 48 partitions en mémoire (correspondant aux cœurs de processeur disponibles), puis partitionne sur le disque par deux colonnes :
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.repartition(48)
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.toTable("deltaeventstable")
)
Utiliser l’écriture optimisée
En guise d’alternative au partitionnement manuel, l’écriture optimisée fusionne ou fractionne les partitions avant l’écriture, maximisant le débit du disque sans appels manuels repartition() ou coalesce(). Activez-le avec une configuration Spark :
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
Une fois l’écriture optimisée activée, vous pouvez supprimer repartition() ou coalesce() à partir de votre code et laisser Spark gérer le dimensionnement de partition. Vous pouvez toujours utiliser partitionBy() pour l’organisation au niveau du disque.
Événements batch avec déclencheurs
Pour optimiser davantage les performances d’écriture, regrouper les événements par lots avant de les écrire sur le disque. Par défaut, Spark traite chaque microbatch dès que le précédent se termine. La définition d’un intervalle d'activation accumule les données sur une période et les écrit dans moins d’opérations mais plus grandes. Les lots plus volumineux produisent des fichiers Delta plus volumineux et réduisent la surcharge des petits fichiers.
L’exemple suivant traite les événements dans des intervalles d’une minute :
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.trigger(processingTime="1 minute")
.toTable("deltaeventstable")
)
Analysez le volume des données entrantes et choisissez un intervalle de traitement qui produit des fichiers Parquet bien dimensionnés dans la table Delta.
Exécuter des travaux de streaming en production
Les notebooks Spark constituent un outil efficace pour développer et tester la logique de streaming. Toutefois, pour les charges de travail de production qui doivent s’exécuter en continu, utilisez plutôt des définitions de travaux Spark. Les définitions de travaux Spark sont des tâches non interactives orientées code qui s’exécutent sur un cluster Spark et offrent une plus grande robustesse et une meilleure disponibilité.
L’infrastructure exécutant un travail de streaming peut rencontrer des problèmes qui arrêtent le travail, tels que les défaillances matérielles ou la mise à jour corrective de l’infrastructure. Une stratégie de nouvelle tentative redémarre automatiquement le travail lorsqu’il s’arrête de façon inattendue. Configurez la stratégie de nouvelle tentative sur une définition de travail Spark pour spécifier le nombre de fois où redémarrer le travail (jusqu’à une nouvelle tentative infinie) et l’intervalle de temps entre les nouvelles tentatives. Avec une stratégie de nouvelle tentative activée, votre travail de streaming continue de s’exécuter jusqu’à ce que vous l’arrêtiez explicitement.
Le hub de supervision Fabric comprend un onglet de streaming structuré avec des métriques, notamment le taux d’entrée, le taux de traitement, les lignes d’entrée, la durée de traitement par lot et la durée de l’opération.