Gegevens opnemen uit SQL Server

Meer informatie over het opnemen van gegevens van SQL Server in Azure Databricks met behulp van Lakeflow Connect.

De SQL Server-connector ondersteunt Azure SQL Database-, Azure SQL Managed Instance- en Amazon RDS SQL-databases. Dit omvat SQL Server die wordt uitgevoerd op virtuele Azure-machines (VM's) en Amazon EC2. De connector ondersteunt ook on-premises SQL Server met behulp van Azure ExpressRoute en AWS Direct Connect-netwerken.

Requirements

  • Als u een opnamegateway en een opnamepijplijn wilt maken, moet u eerst aan de volgende vereisten voldoen:

    • Uw werkruimte is ingeschakeld voor Unity Catalog.

    • Serverloze rekenkracht is ingeschakeld voor uw werkruimte. Zie serverloze rekenvereisten.

    • Als u een verbinding wilt maken: u hebt CREATE CONNECTION bevoegdheden voor de metastore. Zie Beheer van bevoegdheden in Unity Catalog.

      Als uw connector het ontwerpen van pijplijnen op basis van de gebruikersinterface ondersteunt, kunt u de verbinding en de pijplijn tegelijkertijd maken door de stappen op deze pagina uit te voeren. Als u echter op API gebaseerde pijplijncreatie gebruikt, moet u de verbinding maken in Catalog Explorer voordat u de stappen op deze pagina voltooit. Zie Verbinding maken met beheerde opnamebronnen.

    • Als u van plan bent een bestaande verbinding te gebruiken: u hebt USE CONNECTION bevoegdheden of ALL PRIVILEGES voor de verbinding.

    • U hebt USE CATALOG rechten voor de doelcatalogus.

    • U hebt USE SCHEMA, CREATE TABLEen CREATE VOLUME bevoegdheden voor een bestaand schema of CREATE SCHEMA bevoegdheden voor de doelcatalogus.

    • U hebt toegang tot een primair SQL Server-exemplaar. Functies voor het bijhouden en vastleggen van wijzigingen worden niet ondersteund op leesreplika's of secundaire instanties.

    • Onbeperkte machtigingen voor het maken van clusters of een aangepast beleid (alleen API). Een aangepast beleid voor de gateway moet voldoen aan de volgende vereisten:

      • Gezin: Taakberekening

      • Uitzonderingen van beleidsfamilies

        {
          "cluster_type": {
            "type": "fixed",
            "value": "dlt"
          },
          "num_workers": {
            "type": "unlimited",
            "defaultValue": 1,
            "isOptional": true
          },
          "runtime_engine": {
            "type": "fixed",
            "value": "STANDARD",
            "hidden": true
          }
        }
        
      • Databricks raadt aan om de kleinste mogelijke werkknooppunten voor opnamegateways op te geven, omdat deze geen invloed hebben op de gatewayprestaties. Met het volgende rekenbeleid kan Azure Databricks de opnamegateway schalen om te voldoen aan de behoeften van uw workload. De minimale vereiste is 8 kernen om efficiënte en goed presterende gegevensextractie uit uw brondatabase mogelijk te maken.

        {
          "driver_node_type_id": {
            "type": "fixed",
            "value": "Standard_E64d_v4"
          },
          "node_type_id": {
            "type": "fixed",
            "value": "Standard_F4s"
          }
        }
        

      Zie Een rekenbeleid selecteren voor meer informatie over clusterbeleid.

  • Als u wilt opnemen vanuit SQL Server, moet u eerst de stappen in Microsoft SQL Server configureren voor opname in Azure Databricks uitvoeren.

Een gateway en een opnamepijplijn maken

Databricks-gebruikersinterface

  1. Klik in de zijbalk van de Azure Databricks-werkruimte op Gegevensopname.

  2. Klik op de pagina Gegevens toevoegen onder Databricks-connectors op SQL Server.

  3. Selecteer op de pagina Verbinding van de wizard voor gegevensopname de verbinding die SQL Server-toegangsreferenties opslaat uit Configureren van Microsoft SQL Server voor opname in Azure Databricks. Als u de CREATE CONNECTION bevoegdheid hebt voor de metastore, kunt u op het pluspictogram klikken. Maak verbinding om een nieuwe verbinding te maken met de verificatiedetails in SQL Server.

  4. Klik op Volgende.

  5. Voer op de pagina Invoerconfiguratie een unieke naam in voor de invoerpijplijn. Met deze pijplijn worden gegevens van de faseringslocatie naar het doel verplaatst.

  6. Selecteer een catalogus en een schema om gebeurtenislogboeken naar te schrijven. Het gebeurtenislogboek bevat auditlogboeken, controles van gegevenskwaliteit, voortgang van pijplijn en fouten. Als u de bevoegdheden USE CATALOG en CREATE SCHEMA op de catalogus hebt, kunt u op het pluspictogram klikken. Schema maken in de vervolgkeuzelijst om een nieuw schema te maken.

  7. (Optioneel) Automatisch volledig vernieuwen instellen voor alle tabellen op Aan. Wanneer automatisch vernieuwen is ingeschakeld, probeert de pijplijn automatisch problemen op te lossen, zoals gebeurtenissen voor het opschonen van logboeken en bepaalde typen schemaontwikkeling door de betrokken tabel volledig te vernieuwen. Als het bijhouden van geschiedenis is ingeschakeld, wordt die geschiedenis gewist door een volledige vernieuwing.

  8. Voer een unieke naam in voor de opnamegateway. De gateway is een pijplijn die wijzigingen uit de bron extraheert en klaarmaakt voor de ingestiepijplijn om te laden.

  9. Selecteer een catalogus en een schema voor de faseringslocatie. Op deze locatie wordt een volume gemaakt om geëxtraheerde gegevens te organiseren. Als u de bevoegdheden USE CATALOG en CREATE SCHEMA op de catalogus hebt, kunt u op het pluspictogram klikken. Schema maken in de vervolgkeuzelijst om een nieuw schema te maken.

  10. Klik op Pijplijn maken en ga door.

  11. Selecteer op de pagina Bron de tabellen die u wilt opnemen. Als u specifieke tabellen selecteert, kunt u tabelinstellingen configureren:

    a. (Optioneel) Geef op het tabblad Instellingen een doelnaam op voor elke opgenomen tabel. Dit is handig om onderscheid te maken tussen doeltabellen wanneer u een object meerdere keren in hetzelfde schema opneemt. Zie Plaats een naam voor een doeltabel.

    a. (Optioneel) Wijzig de standaardinstelling voor het bijhouden van geschiedenis . Zie Geschiedenis bijhouden inschakelen (SCD-type 2).

  12. Klik op Volgende en klik vervolgens op Opslaan en doorgaan.

  13. Selecteer op de doelpagina een catalogus en een schema om gegevens in te laden. Als u de bevoegdheden USE CATALOG en CREATE SCHEMA op de catalogus hebt, kunt u op het pluspictogram klikken. Schema maken in de vervolgkeuzelijst om een nieuw schema te maken.

  14. Klik op Opslaan en doorgaan.

  15. Klik op de pagina Database-instelling op Valideren om te bevestigen dat uw bron juist is geconfigureerd voor opname van Azure Databricks. Ontbrekende configuraties worden geretourneerd. Klik op Configuratie voltooien voor stappen die u wilt oplossen. Klik vervolgens op Volgende. U kunt ook op Validatie overslaan klikken.

  16. (Optioneel) Klik op de pagina Planningen en meldingen op Het pluspictogram. Maak een planning. Stel de frequentie in om de doeltabellen te vernieuwen.

  17. (Optioneel) Klik op pluspictogram. Voeg een melding toe om e-mailmeldingen in te stellen voor geslaagde of mislukte pijplijnen en klik vervolgens op Opslaan en pijplijn uitvoeren.

Declaratieve automatiseringsbundels

Voordat u gegevens importeert met behulp van Declaratieve Automatiseringsbundels, moet u toegang hebben tot een bestaande verbinding. Voor instructies, zie Verbinding maken met beheerde opnamebronnen.

De faseringscatalogus en het schema kunnen hetzelfde zijn als de doelcatalogus en het schema. De faseringscatalogus kan geen buitenlandse catalogus zijn. Geef de faseringslocatie op in de gateway_definition sectie van het YAML-bestand voor de bundelpijplijn.

De opnamegateway extraheert momentopnamen en wijzigingsgegevens uit de brondatabase en slaat deze op in het tussenvolume van de Unity Catalog. Je moet de gateway uitvoeren als een continue pipeline. Dit helpt bij het instellen van bewaarbeleid voor wijzigingenlogboeken dat u in de brondatabase hebt.

De opnamepijplijn past de momentopname toe en brengt wijzigingsgegevens van het faseringsvolume aan in streamingdoeltabellen.

Bundels kunnen YAML-definities van taken en taken bevatten, worden beheerd met behulp van de Databricks CLI en kunnen worden gedeeld en uitgevoerd in verschillende doelwerkruimten (zoals ontwikkeling, fasering en productie). Zie Wat zijn declaratieve Automation-bundels? voor meer informatie.

  1. Maak een bundel met behulp van de Databricks CLI:

    databricks bundle init
    
  2. Voeg twee nieuwe resourcebestanden toe aan de bundel:

    • Een pijplijndefinitiebestand (bijvoorbeeld resources/sqlserver_pipeline.yml). Zie pipeline.ingestion_definition en voorbeelden.
    • Een taakdefinitiebestand dat de frequentie van gegevensopname bepaalt (bijvoorbeeld resources/sqlserver_job.yml).
  3. Implementeer de pijplijn met behulp van de Databricks CLI:

    databricks bundle deploy
    

Databricks Notebook

Werk de Configuration cel in het volgende notebook bij met de bronverbinding, doelcatalogus, doelschema en tabellen die moeten worden opgenomen uit de bron.

Notitieblok ophalen

Terraform

U kunt Terraform gebruiken om SQL Server-opnamepijplijnen te implementeren en beheren. Voor een volledig voorbeeldframework, waaronder Terraform-configuraties voor het maken van gateways en opnamepijplijnen, raadpleegt u de opslagplaats met voorbeelden van Lakeflow Connect Terraform op GitHub.

Geslaagde gegevensopname verifiëren

In de lijstweergave op de pagina met pijplijndetails ziet u het aantal records dat wordt verwerkt wanneer gegevens worden opgenomen. Deze nummers worden automatisch vernieuwd.

Replicatie verifiëren

De Upserted records kolommen en Deleted records kolommen worden niet standaard weergegeven. U kunt deze inschakelen door op het pictogram voor configuratie van kolommen te klikken en ze te selecteren.

Examples

Gebruik deze voorbeelden om uw pijplijn te configureren.

Pijplijnconfiguratie

Declaratieve automatiseringsbundels

Het volgende pijplijndefinitiebestand:

variables:
  # Common variables used multiple places in the DAB definition.
  gateway_name:
    default: sqlserver-gateway
  dest_catalog:
    default: main
  dest_schema:
    default: ingest-destination-schema

resources:
  pipelines:
    gateway:
      name: ${var.gateway_name}
      gateway_definition:
        connection_name: <sqlserver-connection>
        gateway_storage_catalog: main
        gateway_storage_schema: ${var.dest_schema}
        gateway_storage_name: ${var.gateway_name}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

    pipeline_sqlserver:
      name: sqlserver-ingestion-pipeline
      ingestion_definition:
        ingestion_gateway_id: ${resources.pipelines.gateway.id}
        objects:
          # Modify this with your tables!
          - table:
              # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
              source_catalog: test
              source_schema: ingestion_demo
              source_table: lineitem
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
          - schema:
              # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
              # table name will be the same as it is on the source.
              source_catalog: test
              source_schema: ingestion_whole_schema
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

Databricks Notebook

Hier volgt een voorbeeldsectie Configuration van een pijplijnspecificatie:

# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"

# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"

# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name

# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"

# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"

# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
#  + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])

Taakdefinitiebestand bundelen

Hier volgt een voorbeeld van een taakdefinitiebestand voor gebruik met declaratieve Automation-bundels. De taak wordt elke dag uitgevoerd, precies één dag vanaf de laatste uitvoering.

resources:
  jobs:
    sqlserver_dab_job:
      name: sqlserver_dab_job

      trigger:
        periodic:
          interval: 1
          unit: DAYS

      email_notifications:
        on_failure:
          - <email-address>

      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}

Algemene patronen

Zie Algemene patronen voor beheerde opnamepijplijnen voor geavanceerde pijplijnconfiguraties.

Volgende stappen 

Start, plan en stel waarschuwingen in voor uw pijplijn. Zie Algemene onderhoudstaken voor pijplijnen.

Aanvullende bronnen