Ingérer des données provenant de SQL Server

Découvrez comment ingérer des données de SQL Server dans Azure Databricks à l’aide de Lakeflow Connect.

Le connecteur SQL Server prend en charge les bases de données Azure SQL Database, Azure SQL Managed Instance et Amazon RDS SQL. Cela inclut SQL Server s’exécutant sur des machines virtuelles Azure et Amazon EC2. Le connecteur prend également en charge SQL Server localement à l’aide d’Azure ExpressRoute et de la mise en réseau AWS Direct Connect.

Exigences

  • Pour créer une passerelle d’ingestion et un pipeline d’ingestion, vous devez d’abord répondre aux exigences suivantes :

    • Votre espace de travail est activé pour Unity Catalog.

    • Le calcul serverless est activé pour votre espace de travail. Consultez les exigences en matière de calcul serverless.

    • Si vous envisagez de créer une connexion : vous avez les privilèges CREATE CONNECTION sur le metastore. Consultez Gérer les privilèges dans Unity Catalog.

      Si votre connecteur prend en charge la création de pipelines basés sur l’interface utilisateur, vous pouvez créer la connexion et le pipeline en même temps en effectuant les étapes de cette page. Toutefois, si vous utilisez la création de pipelines basée sur l’API, vous devez créer la connexion dans l’Explorateur de catalogues avant d’effectuer les étapes de cette page. Consultez Se connecter aux sources d’ingestion managées.

    • Si vous envisagez d’utiliser une connexion existante : vous disposez USE CONNECTION de privilèges ou ALL PRIVILEGES sur la connexion.

    • Vous disposez de USE CATALOG privilèges sur le catalogue cible.

    • Vous disposez des privilèges USE SCHEMA, CREATE TABLE, et CREATE VOLUME sur un schéma existant ou de privilèges CREATE SCHEMA sur le catalogue cible.

    • Vous avez accès à une instance SQL Server principale. Les fonctionnalités de suivi des modifications et de capture de données modifiées ne sont pas prises en charge sur les réplicas en lecture ou les instances secondaires.

    • Autorisations illimitées pour créer des clusters ou une stratégie personnalisée (API uniquement). Une stratégie personnalisée pour la passerelle doit répondre aux exigences suivantes :

      • Famille : Calcul des tâches

      • Remplacements de la famille de stratégies :

        {
          "cluster_type": {
            "type": "fixed",
            "value": "dlt"
          },
          "num_workers": {
            "type": "unlimited",
            "defaultValue": 1,
            "isOptional": true
          },
          "runtime_engine": {
            "type": "fixed",
            "value": "STANDARD",
            "hidden": true
          }
        }
        
      • Databricks recommande de spécifier les plus petits nœuds Worker possibles pour les passerelles d’ingestion, car ils n’ont pas d’impact sur les performances de la passerelle. La stratégie de calcul suivante permet à Azure Databricks de mettre à l’échelle la passerelle d’ingestion pour répondre aux besoins de votre charge de travail. La configuration minimale requise est de 8 cœurs pour permettre une extraction de données efficace et performante à partir de votre base de données source.

        {
          "driver_node_type_id": {
            "type": "fixed",
            "value": "Standard_E64d_v4"
          },
          "node_type_id": {
            "type": "fixed",
            "value": "Standard_F4s"
          }
        }
        

      Pour plus d’informations sur les stratégies de cluster, consultez Sélectionner une stratégie de calcul.

  • Pour ingérer à partir de SQL Server, vous devez d’abord effectuer les étapes de configuration de Microsoft SQL Server pour l’ingestion dans Azure Databricks.

Créer une passerelle et un pipeline d’ingestion

Interface utilisateur Databricks

  1. Dans la barre latérale de l’espace de travail Azure Databricks, cliquez sur Ingestion de données.

  2. Dans la page Ajouter des données , sous Connecteurs Databricks, cliquez sur SQL Server.

  3. Dans la page Connexion de l’Assistant Ingestion, sélectionnez la connexion qui stocke les informations d’identification d’accès SQL Server à partir de Configurer Microsoft SQL Server pour l’ingestion dans Azure Databricks. Si vous avez le CREATE CONNECTION privilège sur le metastore, vous pouvez cliquer sur l’icône Plus. Créez une connexion pour créer une connexion avec les détails de l’authentification dans SQL Server.

  4. Cliquez sur Suivant.

  5. Dans la page de configuration de l’ingestion , entrez un nom unique pour le pipeline d’ingestion. Ce pipeline déplace les données de l’emplacement intermédiaire vers la destination.

  6. Sélectionnez un catalogue et un schéma pour écrire des journaux d’événements. Le journal des événements contient les journaux d’audit, les contrôles de qualité des données, l’avancement du pipeline et les erreurs. Si vous disposez des privilèges et sur le catalogue, vous pouvez cliquer sur l’icône Plus dans le menu déroulant pour créer un nouveau schéma.

  7. (Facultatif) Définissez l’actualisation complète automatique pour toutes les tables sur Activé. Lorsque l’actualisation automatique est activée, le pipeline tente automatiquement de résoudre les problèmes tels que les événements de nettoyage des journaux et certains types d’évolution du schéma en actualisant entièrement la table affectée. Si le suivi de l’historique est activé, une actualisation complète efface cet historique.

  8. Entrez un nom unique pour la passerelle d’ingestion. La passerelle est un pipeline qui extrait les modifications de la source et les prépare à être chargées par le pipeline d’ingestion.

  9. Sélectionnez un catalogue et un schéma pour l’emplacement de mise en scène. Un volume est créé à cet emplacement pour stocker temporairement les données extraites. Si vous disposez des privilèges et sur le catalogue, vous pouvez cliquer sur l’icône Plus dans le menu déroulant pour créer un nouveau schéma.

  10. Cliquez sur Créer un pipeline et continuez.

  11. Dans la page Source , sélectionnez les tables à ingérer. Si vous sélectionnez des tables spécifiques, vous pouvez configurer les paramètres de table :

    a. (Facultatif) Sous l’onglet Paramètres , spécifiez un nom de destination pour chaque table ingérée. Cela est utile pour différencier les tables de destination lorsque vous ingérez un objet dans le même schéma plusieurs fois. Voir Nom d’une table de destination.

    a. (Facultatif) Modifiez le paramètre de suivi de l’historique par défaut. Consultez Activer le suivi de l’historique (type SCD 2).

  12. Cliquez sur Suivant, puis sur Enregistrer et continuer.

  13. Dans la page Destination , sélectionnez un catalogue et un schéma dans lequel charger des données. Si vous disposez des privilèges et sur le catalogue, vous pouvez cliquer sur l’icône Plus dans le menu déroulant pour créer un nouveau schéma.

  14. Cliquez sur Enregistrer et continuer.

  15. Dans la page de configuration de la base de données , cliquez sur Valider pour confirmer que votre source est correctement configurée pour l’ingestion d’Azure Databricks. Toutes les configurations manquantes sont retournées. Pour connaître les étapes à résoudre, cliquez sur Terminer la configuration. Cliquez ensuite sur Suivant. Vous pouvez également cliquer sur Ignorer la validation.

  16. (Facultatif) Dans la page Planifications et notifications , cliquez sur l’icône Plus. Créez une planification. Définissez la fréquence pour actualiser les tables de destination.

  17. (Facultatif) Cliquez sur l’icône Plus. Ajoutez une notification pour définir des notifications par e-mail pour la réussite ou l’échec de l’opération de pipeline, puis cliquez sur Enregistrer et exécuter le pipeline.

Paquets d'Automatisation déclarative

Avant d’ingérer à l’aide des Bundles d'Automation Déclarative, vous devez avoir accès à une connexion existante. Pour obtenir des instructions, consultez Se connecter aux sources d’ingestion managées.

Le catalogue et le schéma intermédiaires peuvent être identiques au catalogue et au schéma de destination. Le catalogue de préparation ne peut pas être un catalogue étranger. Spécifiez l’emplacement intermédiaire dans la section gateway_definition de votre fichier YAML d'ensemble du pipeline.

La passerelle d’ingestion extrait l’instantané et modifie les données de la base de données source et les stocke dans le volume intermédiaire du catalogue Unity. Vous devez exécuter la passerelle en tant que pipeline continu. Cela permet de prendre en charge les stratégies de rétention des journaux de modification dont vous disposez sur la base de données source.

Le pipeline d’ingestion applique les données de l’instantané et des modifications provenant du volume de gestion intermédiaire dans des tables de diffusion en continu de destination.

Les bundles peuvent contenir des définitions YAML de travaux et de tâches, sont gérés à l’aide de l’interface CLI Databricks et peuvent être partagés et exécutés dans différents espaces de travail cibles (tels que le développement, la préproduction et la production). Pour plus d’informations, consultez Qu’est-ce que les offres groupées Automation déclaratives ?.

  1. Créez un bundle à l’aide de l’interface CLI Databricks :

    databricks bundle init
    
  2. Ajoutez deux nouveaux fichiers de ressources à l’offre groupée :

    • Fichier de définition de pipeline (par exemple, resources/sqlserver_pipeline.yml). Consultez pipeline.ingestion_definition et des exemples.
    • Fichier de définition de travail qui contrôle la fréquence d’ingestion de données (par exemple). resources/sqlserver_job.yml
  3. Déployez le pipeline à l’aide de l’interface CLI Databricks :

    databricks bundle deploy
    

Notebook Databricks

Mettez à jour la cellule Configuration dans le notebook suivant avec la connexion à la source, le catalogue cible, le schéma cible et les tables à ingérer depuis la source.

Obtenir un ordinateur portable

Terraform

Vous pouvez utiliser Terraform pour déployer et gérer des pipelines d’ingestion SQL Server. Pour obtenir un exemple complet d’infrastructure, notamment les configurations Terraform pour la création de passerelles et de pipelines d’ingestion, consultez le référentiel d’exemples Terraform Lakeflow Connect sur GitHub.

Vérifier la réussite de l’ingestion des données

L'affichage de liste sur la page de détails du pipeline montre le nombre d'enregistrements traités à mesure que les données sont ingérées. Ces nombres sont actualisés automatiquement.

Vérifier la réplication

Par défaut, les colonnes Upserted records et Deleted records ne sont pas affichées. Vous pouvez les activer en cliquant sur le bouton de configuration des colonnes Icône de configuration des colonnes et en les sélectionnant.

Exemples

Utilisez ces exemples pour configurer votre pipeline.

Configuration du pipeline

Paquets d'Automatisation déclarative

Fichier de définition de pipeline suivant :

variables:
  # Common variables used multiple places in the DAB definition.
  gateway_name:
    default: sqlserver-gateway
  dest_catalog:
    default: main
  dest_schema:
    default: ingest-destination-schema

resources:
  pipelines:
    gateway:
      name: ${var.gateway_name}
      gateway_definition:
        connection_name: <sqlserver-connection>
        gateway_storage_catalog: main
        gateway_storage_schema: ${var.dest_schema}
        gateway_storage_name: ${var.gateway_name}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

    pipeline_sqlserver:
      name: sqlserver-ingestion-pipeline
      ingestion_definition:
        ingestion_gateway_id: ${resources.pipelines.gateway.id}
        objects:
          # Modify this with your tables!
          - table:
              # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
              source_catalog: test
              source_schema: ingestion_demo
              source_table: lineitem
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
          - schema:
              # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
              # table name will be the same as it is on the source.
              source_catalog: test
              source_schema: ingestion_whole_schema
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

Notebook Databricks

Voici un exemple Configuration de section d’une spécification de pipeline :

# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"

# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"

# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name

# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"

# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"

# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
#  + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])

Fichier de définition de tâche groupée

Voici un exemple de fichier de définition de tâches à utiliser avec les Declarative Automation Bundles. Le travail s’exécute tous les jours, exactement un jour à partir de la dernière exécution.

resources:
  jobs:
    sqlserver_dab_job:
      name: sqlserver_dab_job

      trigger:
        periodic:
          interval: 1
          unit: DAYS

      email_notifications:
        on_failure:
          - <email-address>

      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}

Modèles courants

Pour connaître les configurations de pipeline avancées, consultez Modèles courants pour les pipelines d’ingestion managés.

Étapes suivantes

Démarrez, planifiez et définissez des alertes sur votre pipeline. Consultez les tâches courantes de maintenance de pipeline.

Ressources supplémentaires