Charger des données dans des pipelines

Vous pouvez charger des données à partir de n’importe quelle source de données prise en charge par Apache Spark sur Azure Databricks à l’aide de pipelines. Vous pouvez définir des jeux de données (tables et vues) dans des pipelines déclaratifs Spark Lakeflow sur n’importe quelle requête qui retourne un DataFrame Spark, y compris la diffusion en continu de DataFrames et Pandas pour les DataFrames Spark. Pour les tâches d’ingestion de données, Databricks recommande d’utiliser des tables de streaming pour la plupart des cas d’usage. Les tables de flux sont utiles pour ingérer des données à partir du stockage d’objets cloud à l’aide d’Auto Loader ou des bus de messages comme Kafka.

Toutes les sources de données ne prennent pas en charge SQL pour l’ingestion. Toutefois, vous pouvez combiner sql et Python sources dans le même pipeline pour utiliser Python si nécessaire. Pour plus d’informations sur l’utilisation des bibliothèques non empaquetées dans les pipelines déclaratifs Spark Lakeflow par défaut, consultez Gérer les dépendances Python pour les pipelines. Pour obtenir des informations générales sur l’ingestion dans Azure Databricks, consultez les connecteurs Standard dans Lakeflow Connect.

Les exemples suivants illustrent certains modèles de chargement de données courants.

Charger à partir d’une table existante

Chargez des données à partir de n’importe quelle table existante dans Azure Databricks. Vous pouvez transformer les données à l’aide d’une requête ou charger la table pour un traitement supplémentaire dans votre pipeline.

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Charger des fichiers à partir du stockage d’objets cloud

Databricks recommande d’utiliser le chargeur automatique dans les pipelines pour la plupart des tâches d’ingestion de données à partir du stockage d’objets cloud ou à partir de fichiers dans un volume de catalogue Unity. Le chargeur automatique et les pipelines sont conçus pour charger de manière incrémentielle et idempotente des données toujours croissantes à mesure qu’elles arrivent dans le stockage cloud. Voir Qu’est-ce que le chargeur automatique ? et charger des données à partir du stockage d’objets.

L’exemple suivant lit les données du stockage cloud à l’aide du chargeur automatique.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Les exemples suivants utilisent le chargeur automatique pour créer des jeux de données à partir de fichiers CSV dans un volume de catalogue Unity.

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Note

  • Si vous utilisez Auto Loader avec les notifications de fichiers, et si vous exécutez une actualisation complète pour votre pipeline ou votre table de streaming, vous devez nettoyer manuellement vos ressources. Vous pouvez utiliser CloudFilesResourceManager dans un notebook pour effectuer le nettoyage.
  • Pour charger des fichiers avec le chargeur automatique dans un pipeline compatible avec le catalogue Unity, vous devez utiliser des emplacements externes. Pour en savoir plus sur l’utilisation du catalogue Unity avec des pipelines, consultez Utiliser le catalogue Unity avec des pipelines.

S’authentifier auprès du stockage cloud

Le chargeur automatique utilise des emplacements externes du catalogue Unity pour s’authentifier auprès du stockage cloud. Vous devez configurer un emplacement externe pour le chemin d’accès de stockage à partir duquel vous souhaitez lire et accorder le READ FILES privilège à l’utilisateur en cours d’exécution.

Pour ingérer à partir de Azure Data Lake Storage, configurez un emplacement externe soutenu par des informations d’identification de stockage qui référencent un conteneur de stockage. Pour plus d’informations, consultez Se connecter au stockage d’objets cloud à l’aide du catalogue Unity.

Charger des données à partir d’un bus de messages

Vous pouvez configurer des pipelines pour ingérer des données à partir de bus de messages. Databricks recommande d’utiliser des tables de diffusion en continu avec une exécution continue et une mise à l’échelle automatique améliorée pour fournir l’ingestion la plus efficace pour le chargement à faible latence à partir de bus de messages. Pour plus d’informations, consultez Optimiser l’utilisation du cluster des pipelines déclaratifs Spark Lakeflow avec mise à l’échelle automatique.

Par exemple, le code suivant configure une table de diffusion en continu pour ingérer des données à partir de Kafka à l’aide de la fonction read_kafka .

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Pour ingérer depuis d’autres sources de bus de messages, consultez :

Load des données à partir de Azure Event Hubs

Azure Event Hubs est un service de streaming de données qui fournit une interface compatible Apache Kafka. Vous pouvez utiliser le connecteur Kafka pour le streaming structuré, inclus dans le runtime des pipelines déclaratifs Spark Lakeflow, afin de charger des messages à partir d’Azure Event Hubs. Pour en savoir plus sur le chargement et le traitement des messages à partir d’Azure Event Hubs, consultez Utiliser Azure Event Hubs comme source de données de pipeline.

Charger des données à partir de systèmes externes

Lakeflow Spark Declarative Pipelines prend en charge le chargement de données à partir de n’importe quelle source de données prise en charge par Azure Databricks. Consultez Se connecter aux sources de données et aux services externes. Vous pouvez également charger des données externes à l’aide de Lakehouse Federation pour les sources de données prises en charge. Étant donné que Lakehouse Federation nécessite Databricks Runtime 13.3 LTS ou version ultérieure, pour utiliser Lakehouse Federation, configurez votre pipeline pour utiliser le canal d’aperçu.

Certaines sources de données n’ont pas de prise en charge SQL équivalente. Si vous ne pouvez pas utiliser Lakehouse Federation avec l’une de ces sources de données, vous pouvez utiliser Python pour ingérer des données à partir de la source. Vous pouvez ajouter des fichiers sources Python et SQL au même pipeline. L’exemple suivant déclare une vue matérialisée pour accéder à l’état actuel des données dans une table PostgreSQL distante.

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Charger des jeux de données petits ou statiques à partir du stockage d’objets cloud

Vous pouvez charger des jeux de données petits ou statiques à l’aide de la syntaxe de chargement Apache Spark. Lakeflow Spark Declarative Pipelines prend en charge tous les formats de fichier pris en charge par Apache Spark sur Azure Databricks. Pour obtenir une liste complète, consultez les options de format de données.

Les exemples suivants illustrent le chargement de JSON pour créer une table.

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Note

La fonction SQL read_files est commune à tous les environnements SQL sur Azure Databricks. Il s’agit du modèle recommandé pour l’accès direct aux fichiers à l’aide de SQL dans les pipelines. Pour plus d’informations, consultez Options.

Charger des données à partir d’une source de données personnalisée Python

Les sources de données personnalisées Python vous permettent de charger des données dans des formats personnalisés. Vous pouvez écrire du code pour lire et écrire dans une source de données externe spécifique ou utiliser votre code de Python existant pour lire des données à partir de vos propres systèmes internes. Pour plus d’informations sur le développement de sources de données Python, consultez les sources de données personnalisées PySpark.

L’exemple suivant enregistre une source de données personnalisée avec le nom my_custom_datasource de format et la lit à partir de celle-ci dans les modes de traitement par lots et de diffusion en continu.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Configurer une table de streaming pour ignorer les modifications dans une table de streaming source

Par défaut, les tables de streaming nécessitent des sources en ajout uniquement. Si votre table de diffusion en continu source nécessite des mises à jour ou des suppressions — par exemple, pour le traitement du « droit à l'oubli » dans le cadre du RGPD — utilisez l'indicateur skipChangeCommits pour ignorer ces modifications. Cet indicateur fonctionne uniquement avec la fonction spark.readStream et ne peut pas être utilisé lorsque la table de streaming source est la cible d’une fonction create_auto_cdc_flow(). Pour plus d’informations, consultez Gérer les modifications apportées aux tables Delta sources.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Accéder de manière sécurisée aux identifiants de stockage avec des secrets dans un pipeline

Vous pouvez utiliser des secrets Azure Databricks pour stocker des informations d’identification telles que des clés d’accès ou des mots de passe. Pour configurer le secret dans votre pipeline, utilisez une propriété Spark dans la configuration de cluster des paramètres de pipeline. Consultez Configurer le calcul classique pour les pipelines.

L’exemple suivant utilise un secret pour stocker une clé d’accès requise pour lire les données d’entrée à partir d’un compte de stockage Azure Data Lake Storage à l’aide du chargeur automatique. Vous pouvez utiliser cette même méthode pour configurer tout secret requis par votre pipeline, par exemple, des clés AWS pour accéder à S3 ou le mot de passe pour un metastore Apache Hive.

Pour en savoir plus sur l’utilisation d’Azure Data Lake Storage, consultez Se connecter à Azure Data Lake Storage et au Stockage Blob.

Note

Vous devez ajouter le préfixe spark.hadoop. à la clé de configuration spark_conf qui définit la valeur du secret.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Dans cet exemple de code, remplacez les valeurs suivantes.

Espace réservé Remplacer par
<container-name> Nom du conteneur de compte de stockage Azure.
<storage-account-name> Nom du compte de stockage ADLS.
<path> Chemin d’accès aux données et métadonnées de sortie du pipeline.
<scope-name> Nom du scope de secret Azure Databricks.
<secret-name> Nom de la clé contenant la clé d'accès au compte de stockage Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Dans cet exemple de code, remplacez les valeurs suivantes.

Espace réservé Remplacer par
<container-name> Nom du conteneur de compte de stockage Azure qui stocke les données d’entrée.
<storage-account-name> Nom du compte de stockage ADLS.
<path-to-input-dataset> Chemin d’accès au jeu de données d’entrée.