Modèles courants de chargement de données

Auto Loader simplifie un certain nombre de tâches courantes d’ingestion de données. Cet aide-mémoire fournit des exemples de plusieurs modèles populaires.

Ingérer des données à partir du stockage d’objets cloud en tant que variante

Le chargeur automatique peut charger toutes les données des sources de fichiers prises en charge en tant que colonne unique VARIANT dans une table cible. Étant donné qu’il VARIANT est flexible face aux modifications de schéma et de type et maintient la sensibilité de la casse et NULL les valeurs présentes dans la source de données, ce modèle est robuste face à la plupart des scénarios d’ingestion. Pour plus d’informations, consultez Ingestion de données à partir du stockage d’objets cloud en tant que variante.

Filtrage des répertoires ou des fichiers à l’aide de modèles Glob

Les modèles Glob peuvent être utilisés pour filtrer les répertoires et les fichiers lorsqu’ils sont fournis dans le chemin d’accès.

Motif Descriptif
? Correspond à n'importe quel caractère unique
* Correspond à zéro ou plusieurs caractères
[abc] Correspond à un seul caractère du jeu de caractères {a,b,c}.
[a-z] Correspond à un seul caractère de la plage de caractères {a…z}.
[^a] Correspond à un seul caractère qui ne fait pas partie du jeu ou de la plage de caractères {a}. Notez que le caractère ^ doit se trouver immédiatement à droite du crochet ouvrant.
{ab,cd} Correspond à une chaîne du jeu de chaînes {ab, cd}.
{ab,c{de, fh}} Correspond à une chaîne du jeu de chaînes {ab, cde, cfh}.

Utilisez le path pour fournir des modèles de préfixe, par exemple :

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Langage de programmation Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/*/files")

Vous devez utiliser l’option pathGlobFilter pour fournir explicitement des modèles de suffixe. Le path ne fournit qu’un filtre de préfixe. Par exemple, si vous souhaitez analyser uniquement png des fichiers dans un répertoire qui contient des fichiers avec différents suffixes, vous pouvez effectuer les opérations suivantes :

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Langage de programmation Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")

Note

Le comportement d’utilisation des caractères génériques (globbing) par défaut d’Auto Loader est différent de celui des autres sources de fichiers Spark. Ajoutez .option("cloudFiles.useStrictGlobber", "true") à votre lecture pour utiliser le globbing qui correspond au comportement Spark par défaut sur les sources de fichiers. Pour plus d’informations sur le globbing, consultez le tableau suivant :

Motif Chemins d'accès au fichier Globber par défaut Globber strict
/a/b /a/b/c/file.txt Oui Oui
/a/b /a/b_dir/c/file.txt Non Non
/a/b /a/b.txt Non Non
/a/b/ /a/b.txt Non Non
/a/*/c/ /a/b/c/file.txt Oui Oui
/a/*/c/ /a/b/c/d/file.txt Oui Oui
/a/*/c/ /a/b/x/y/c/file.txt Oui Non
/a/*/c /a/b/c_file.txt Oui Non
/a/*/c/ /a/b/c_file.txt Oui Non
/a/*/c/ /a/*/cookie/file.txt Oui Non
/a/b* /a/b.txt Oui Oui
/a/b* /a/b/file.txt Oui Oui
/a/{0.txt,1.txt} /a/0.txt Oui Oui
/a/*/{0.txt,1.txt} /a/0.txt Non Non
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Oui Oui

Activer Easy ETL

Un moyen simple de transférer vos données dans Delta Lake sans en perdre consiste à utiliser le modèle suivant et à activer l'inférence de schéma avec Auto Loader. Databricks recommande d'exécuter le code suivant dans une tâche Azure Databricks pour qu'il redémarre automatiquement votre stream lorsque le schéma de vos données source change. Par défaut, le schéma est déduit en tant que types de chaîne. Toute erreur d'analyse (il ne devrait pas y en avoir si tout reste sous forme de chaîne) ira dans _rescued_data et toute nouvelle colonne fera échouer le stream et fera évoluer le schéma.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Empêcher la perte de données dans les données bien structurées

Lorsque vous connaissez votre schéma mais que vous souhaitez capturer des données inattendues, Databricks recommande d’utiliser le rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("/Volumes/catalog_name/schema_name/volume_name/source_data")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Si vous souhaitez que votre flux cesse de traiter si un nouveau champ est introduit qui ne correspond pas à votre schéma, vous pouvez ajouter :

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Activer des pipelines de données semi-structurés flexibles

Lorsque vous recevez des données d’un fournisseur qui introduit de nouvelles colonnes aux informations qu’ils fournissent, vous ne savez peut-être pas exactement quand ils le font, ou vous n’avez peut-être pas la bande passante pour mettre à jour votre pipeline de données. Vous pouvez désormais tirer parti de l’évolution du schéma pour redémarrer le stream et laisser Auto Loader mettre à jour automatiquement le schéma déduit. Vous pouvez également tirer parti schemaHints de certains des champs « sans schéma » fournis par le fournisseur.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/Volumes/catalog_name/schema_name/volume_name/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Transformer des données JSON imbriquées

Étant donné que le chargeur automatique déduit les colonnes JSON de niveau supérieur en tant que chaînes, vous pouvez vous retrouver avec des objets JSON imbriqués qui nécessitent d’autres transformations. Vous pouvez utiliser des API d’accès aux données semi-structurées pour transformer davantage de contenu JSON complexe.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inférer des données JSON imbriquées

Quand vous avez des données imbriquées, vous pouvez utiliser l’option cloudFiles.inferColumnTypes pour inférer la structure imbriquée de vos données et d’autres types de colonnes.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("/Volumes/catalog_name/schema_name/volume_name/nested_json")

Charger des fichiers CSV sans en-têtes

L’exemple suivant montre comment charger des fichiers CSV sans en-têtes à l’aide du chargeur automatique. Permet rescuedDataColumn de capturer les données qui ne correspondent pas au schéma fourni.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Langage de programmation Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Appliquer un schéma sur des fichiers CSV avec des en-têtes

L’exemple suivant montre comment appliquer un schéma sur des fichiers CSV qui incluent des en-têtes. Permet rescuedDataColumn de capturer les données qui ne correspondent pas au schéma fourni.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Langage de programmation Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingérer des données image ou binaires dans Delta Lake pour ML

Une fois les données stockées dans Delta Lake, vous pouvez exécuter l’inférence distribuée sur les données. Consultez Effectuer une inférence distribuée à l'aide de pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("/Volumes/catalog_name/schema_name/volume_name/images") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target>")

Langage de programmation Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("/Volumes/catalog_name/schema_name/volume_name/images")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target>")

Syntaxe du chargeur automatique pour les pipelines déclaratifs Spark Lakeflow

Les pipelines déclaratifs Spark Lakeflow fournissent une syntaxe Python légèrement modifiée pour Auto Loader et ajoutent la prise en charge SQL pour Auto Loader. Les exemples suivants utilisent le chargeur automatique pour créer des jeux de données à partir de fichiers JSON à l’aide de l’exemple de jeu de données de réservation de voyage Wanderbricks :

Python

@dp.table
def booking_updates():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

@dp.table
def reviews():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true
)

CREATE OR REFRESH STREAMING TABLE reviews
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews",
  format => "json",
  multiLine => true
)

Vous pouvez utiliser les options de format prises en charge pour le chargeur automatique. Les options pour read_files sont des paires clé-valeur. Pour plus d’informations sur les formats et options pris en charge, consultez Options.

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

L’exemple suivant lit les fichiers JSON multilignes avec l’inférence de type de colonne activée :

CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  inferColumnTypes => true
)

Vous pouvez utiliser l’élément schema pour spécifier le format manuellement. Vous devez spécifier cet élément schema pour les formats qui ne prennent pas en charge l’inférence de schéma :

Python

@dp.table
def booking_updates_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
      .option("cloudFiles.format", "json")
      .option("multiLine", "true")
      .load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
  )

SQL

CREATE OR REFRESH STREAMING TABLE booking_updates_raw
AS SELECT *
FROM STREAM read_files(
  "/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
  format => "json",
  multiLine => true,
  schema => "booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP"
)

Note

Les pipelines déclaratifs Spark Lakeflow configurent et gèrent automatiquement les répertoires de schéma et de point de contrôle lors de l’utilisation du chargeur automatique pour lire des fichiers. Toutefois, si vous configurez manuellement l’un de ces répertoires, l’actualisation complète n’affecte pas le contenu des répertoires configurés. Databricks recommande d’utiliser les répertoires configurés automatiquement pour éviter des effets secondaires inattendus lors du traitement.

Étapes suivantes