Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page montre comment utiliser foreachBatch avec Structured Streaming pour écrire la sortie d’une requête de diffusion en continu dans des sources de données qui n’ont pas de récepteur de streaming existant.
Le modèle de code streamingDF.writeStream.foreachBatch(...) vous permet d’appliquer des fonctions par lot aux données de sortie de chaque micro-lot de la requête en continu. Les fonctions utilisées avec foreachBatch acceptent deux paramètres :
- Un DataFrame contenant les données de sortie d’un micro-lot.
- L’ID unique du micro-lot.
Vous devez utiliser foreachBatch pour les opérations de fusion Delta Lake dans Structured Streaming. Veuillez consulter la section Fusion à partir de requêtes en continu avec foreachBatch.
Appliquer d’autres opérations DataFrame
De nombreuses opérations DataFrame et DataSet ne sont pas prises en charge dans les DataFrames de streaming, Spark ne prend pas en charge la génération de plans incrémentiels dans ces cas. En utilisant foreachBatch(), vous pouvez appliquer certaines de ces opérations sur chaque sortie de micro-batch. Par exemple, vous pouvez utiliser foreachBatch() et l’opération SQL MERGE INTO pour écrire le résultat d’agrégations en continu dans une table Delta en mode mise à jour. Pour plus d’informations, consultez MERGE INTO.
Importante
-
foreachBatch()ne fournit que des garanties d'écriture au moins une fois. Mais vous pouvez utiliser lebatchIdfourni à la fonction pour dédupliquer la sortie et obtenir une garantie une seule fois. Dans les deux cas, vous devrez définir vous-même la sémantique de bout en bout. -
foreachBatch()ne fonctionne pas avec le mode de traitement continu car il repose fondamentalement sur l'exécution par micro-batch d'une requête de streaming. Si vous écrivez des données en mode continu, utilisezforeach()à la place. - Lorsque vous utilisez
foreachBatchavec un opérateur avec état, il est important de consommer entièrement chaque lot avant la fin du traitement. Veuillez consulter la section Consommer entièrement chaque DataFrame de lot
Gérer des DataFrames vides
foreachBatch() peut recevoir un DataFrame vide, et votre code doit gérer ce scénario. Sinon, votre requête peut échouer.
Par exemple, lorsque Delta Lake est la source de diffusion en continu, ces scénarios peuvent passer un DataFrame vide à foreachBatch():
-
OPTIMIZEsans fichier à traiter : lorsqu’uneOPTIMIZEopération s’exécute sur la table source Delta Lake, mais qu’il n’existe aucun fichier à traiter, Structured Streaming écrit une entrée de journal offset pour incrémenter la version de la table. Cela produit un micro-lot vide sur le récepteur, même si aucun fichier n’est lu. - Taille de fichier au niveau du plan physique : si le prédicat pushdown ou la taille de fichier élimine tous les enregistrements au niveau du plan physique, le résultat est une validation vide sur le récepteur.
Le code utilisateur doit gérer des DataFrames vides pour permettre une opération appropriée. Consultez les exemples ci-dessous :
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Changements de comportement pour foreachBatch dans Databricks Runtime 14.0
Dans Databricks Runtime 14.0 et versions ultérieures sur le calcul configuré avec le mode d’accès standard, les modifications de comportement suivantes s’appliquent :
- Les commandes
print()écrivent la sortie dans les journaux du driver. - Vous ne pouvez pas accéder au sous-module
dbutils.widgetsà l’intérieur de la fonction. - Tous les fichiers, modules ou objets référencés dans la fonction doivent être sérialisables et disponibles sur Spark.
Réutiliser les sources de données de lot existantes
Grâce à foreachBatch(), vous pouvez utiliser les writers de données batch existants pour des récepteurs de données qui ne sont peut-être pas pris en charge par Structured Streaming. Voici quelques exemples :
De nombreuses autres sources de données par lots peuvent être utilisées à partir de foreachBatch(). Consultez Se connecter aux sources de données et aux services externes.
Écrire dans plusieurs emplacements
Si vous devez écrire le résultat d’une requête en continu vers plusieurs emplacements, Databricks recommande d’utiliser plusieurs writers Structured Streaming pour une meilleure parallélisation et un meilleur débit.
L’utilisation de foreachBatch pour écrire vers plusieurs récepteurs sérialise l’exécution des écritures en continu, ce qui peut augmenter la latence de chaque micro-lot.
Si vous utilisez foreachBatch pour écrire dans plusieurs tables Delta, consultez Utiliser foreachBatch pour les écritures de tables idempotentes.
Consommer entièrement chaque DataFrame de lot
Lorsque vous utilisez des opérateurs d'état (par exemple, en utilisant dropDuplicatesWithinWatermark), chaque itération de lot doit consommer l’intégralité du DataFrame ou redémarrer la requête. Si vous n’utilisez pas l’intégralité du DataFrame, la requête de streaming échoue avec le lot suivant.
Cela peut se produire dans plusieurs cas. Les exemples suivants montrent comment corriger les requêtes qui ne consomment pas correctement un DataFrame.
Utilisation intentionnelle d’un sous-ensemble du lot
Si vous vous souciez uniquement d’un sous-ensemble du lot, vous pouvez avoir du code tel que le suivant.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Dans ce cas, le batch_df.show(2) seul gère les deux premiers éléments du lot, ce qui est attendu, mais s’il y a plus d’éléments, ils doivent être consommés. Le code suivant utilise le DataFrame complet.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Ici, la do_nothing fonction ignore silencieusement le reste du DataFrame.
Gestion d’une erreur dans un lot
Pour la gestion des erreurs dans foreachBatch, Databricks recommande d’autoriser la requête de diffusion en continu à échouer rapidement et à la place s’appuyer sur la couche d’orchestration, telle que Lakeflow Jobs ou Apache Airflow, pour gérer la logique de nouvelle tentative. Cela est beaucoup plus sûr que la création de boucles de nouvelle tentative complexes dans votre code, où la perte de données peut se produire.
Voici des consignes basées sur votre objectif de rédaction :
| Cible | Exemples | Conseils |
|---|---|---|
| Opérations de DataFrame | Tables Delta Lake | Vous devez utiliser les options d’écriture txnAppId et txnVersion, en liant txnVersion à batchId, afin de garantir l'idempotence et protéger l'intégrité des données lors des nouvelles tentatives. Ne pas intercepter et réessayer les exceptions localement. Au lieu de cela, Databricks vous recommande d’autoriser les erreurs à propager afin que les métriques Spark restent précises, que les données ne soient pas dupliquées et que l’orchestrateur puisse réessayer correctement le lot complet. |
| Code personnalisé et destinations externes |
.collect(), bases de données OLTP, files d’attente de messages, API |
Implémentez votre propre idempotence. Vous devez supposer que n’importe quelle opération peut et sera retentée par lots. Si le batchId reste le même, le résultat de votre opération doit rester le même. Vous pouvez réessayer des erreurs purement temporaires, comme des délais d'expiration de connexion brefs, mais prenez soin d'éviter les écritures partielles ou en double si la nouvelle tentative échoue. L’approche la plus sûre consiste à laisser les erreurs se propager et permettre à l’orchestrateur de réessayer l’intégralité du lot. |
Voici quelques exemples de types d’exceptions et de recommandations pour savoir comment les gérer dans foreachBatch:
| Type d'exception | Exemples | Action recommandée |
|---|---|---|
| Erreurs de puits transitoires |
SQLTransientConnectionException, HTTP 429, délais d’expiration |
Catch: réessayer ou envoyer à une file morte |
| Violations de contraintes de doublon ou de clé lorsque le récepteur est idempotent | SQLIntegrityConstraintViolationException |
Catch : journaliser et supprimer |
| Erreurs réessayables personnalisées | Exceptions de socket encapsulées, erreurs de base de données répétables | Catch : incrémenter les métriques et autoriser la continuation contrôlée |
| Erreurs de logique ou de schéma |
NullPointerException, AttributeError, incompatibilité de schéma |
Propager : laisser Spark échouer la requête |
| Erreurs de récepteur non retenables ou bogues logiques non résolus |
ValueError, PermissionError |
Propager : laisser Spark échouer la requête |
| Échecs critiques |
OutOfMemoryError, état endommagé, violations de l’intégrité des données |
Propager : laisser Spark échouer la requête |
Exemples de code : gestion des exceptions
Les exemples suivants déclenchent intentionnellement une erreur dans foreach pour montrer différentes approches pour gérer l’erreur :
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Le code ci-dessus gère et supprime silencieusement l’erreur et peut ne pas consommer le reste du lot. Il existe deux options pour gérer cette situation.
Tout d’abord, vous pouvez relancer l’erreur, qui la transmet à votre couche d’orchestration pour réessayer le lot. Cela peut résoudre l’erreur, s’il s’agit d’un problème temporaire ou le déclencher pour que votre équipe des opérations tente de corriger manuellement. Pour ce faire, modifiez le partial_func code pour qu’il ressemble à ceci :
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Deuxièmement, si vous souhaitez intercepter l’exception et ignorer le reste du lot, vous pouvez modifier le code pour utiliser la do_nothing fonction pour ignorer silencieusement le reste du lot.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()