Ingerir dados do SQL Server

Aprenda como ingerir dados do SQL Server para Azure Databricks usando o Lakeflow Connect.

O conector do SQL Server oferece suporte ao Banco de Dados SQL do Azure, à Instância Gerenciada SQL do Azure e aos bancos de dados SQL do Amazon RDS. Isso inclui o SQL Server em execução em máquinas virtuais (VMs) do Azure e o Amazon EC2. O conector também oferece suporte ao SQL Server local usando a rede Azure ExpressRoute e AWS Direct Connect.

Requisitos

  • Para criar um gateway de ingestão e um pipeline de ingestão, deve primeiro cumprir os seguintes requisitos:

    • Seu espaço de trabalho está habilitado para o Catálogo Unity.

    • A computação sem servidor está habilitada para seu espaço de trabalho. Consulte Requisitos de computação sem servidor.

    • Se tencionas criar uma conexão: tens CREATE CONNECTION privilégios no metastore. Consulte Gerenciar privilégios no Catálogo Unity.

      Se o seu conector suportar a criação de pipeline baseada na interface do utilizador, poderá criar a conexão e o pipeline simultaneamente ao concluir as etapas nesta página. No entanto, se você usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas nesta página. Consulte Conectar-se a fontes de ingestão gerenciadas.

    • Se você planeja usar uma conexão existente: você tem USE CONNECTION privilégios ou ALL PRIVILEGES na conexão.

    • No catálogo de destino, você tem privilégios de USE CATALOG.

    • Você tem USE SCHEMA, CREATE TABLE e CREATE VOLUME privilégios num esquema existente ou CREATE SCHEMA privilégios no catálogo de destino.

    • Você tem acesso a uma instância primária do SQL Server. Os recursos de registo de alterações e captura de dados de alterações não são suportados em réplicas de leitura ou instâncias secundárias.

    • Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:

      • Família: Computação de Trabalhos

      • A família de políticas substitui:

        {
          "cluster_type": {
            "type": "fixed",
            "value": "dlt"
          },
          "num_workers": {
            "type": "unlimited",
            "defaultValue": 1,
            "isOptional": true
          },
          "runtime_engine": {
            "type": "fixed",
            "value": "STANDARD",
            "hidden": true
          }
        }
        
      • O Databricks recomenda especificar os menores nós de trabalho possíveis para gateways de ingestão porque eles não afetam o desempenho do gateway. A política de computação a seguir permite que o Azure Databricks dimensione o gateway de ingestão para atender às necessidades de sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração de dados eficiente e eficiente do seu banco de dados de origem.

        {
          "driver_node_type_id": {
            "type": "fixed",
            "value": "Standard_E64d_v4"
          },
          "node_type_id": {
            "type": "fixed",
            "value": "Standard_F4s"
          }
        }
        

      Para obter mais informações sobre políticas de cluster, consulte Selecionar uma política de computação.

  • Para ingerir a partir do SQL Server, deve primeiro completar os passos em Configurar o Microsoft SQL Server para a ingestão no Azure Databricks.

Criar um gateway e um pipeline de ingestão

Interface do usuário do Databricks

  1. Na barra lateral do espaço de trabalho do Azure Databricks, clique em Ingestão de Dados.

  2. Na página Adicionar dados , em Conectores Databricks, clique em SQL Server.

  3. Na página Ligação do assistente de ingestão, selecione a ligação que armazena as credenciais de acesso ao SQL Server de Configurar Microsoft SQL Server para ingestão no Azure Databricks. Se tiveres esse CREATE CONNECTION privilégio na metastore, podes clicar no ícone Plus. Crie uma ligação para criar uma nova ligação com os detalhes de autenticação no SQL Server.

  4. Clique em Next.

  5. Na página de Configuração de Ingestão , introduza um nome único para o pipeline de ingestão. Esta canal move dados do local intermediário para o destino.

  6. Selecione um catálogo e um esquema para escrever registos de eventos. O registo de eventos contém registos de auditoria, verificações de qualidade de dados, progresso do pipeline e erros. Se tiver USE CATALOG e CREATE SCHEMA privilégios no catálogo, pode clicar no ícone Mais. Criar esquema no menu suspenso para criar um novo esquema.

  7. (Opcional) Defina a atualização automática completa para todas as tabelas para Ligado. Quando a atualização automática está ativada, o pipeline tenta automaticamente corrigir problemas como eventos de limpeza de registos e certos tipos de evolução de esquemas, atualizando completamente a tabela afetada. Se o rastreamento do histórico estiver ativado, uma atualização completa apaga esse histórico.

  8. Introduza um nome exclusivo para a porta de entrada de ingestão. O gateway é um pipeline que extrai as alterações da fonte de dados e as prepara para que o pipeline de ingestão carregue.

  9. Selecione um catálogo e um esquema para o local de Staging. É criado um volume neste local para escalonar os dados extraídos. Se tiver USE CATALOG e CREATE SCHEMA privilégios no catálogo, pode clicar no ícone Mais. Criar esquema no menu suspenso para criar um novo esquema.

  10. Clique em Criar pipeline e continue.

  11. Na página Origem , selecione as tabelas a serem ingeridas. Se selecionares tabelas específicas, podes configurar as definições das tabelas:

    a. (Opcional) Na aba Definições, especifique um nome de destino para cada tabela ingerida. Isto é útil para diferenciar tabelas de destino quando ingere um objeto no mesmo esquema várias vezes. Veja Nomeie uma tabela de destinos.

    a. (Opcional) Altere a configuração padrão de rastreamento de histórico History tracking. Consulte Ativar rastreamento de histórico (SCD tipo 2).

  12. Clica em Próximo, depois em Guardar e continua.

  13. Na página de Destino , selecione um catálogo e um esquema para carregar dados. Se tiver USE CATALOG e CREATE SCHEMA privilégios no catálogo, pode clicar no ícone Mais. Criar esquema no menu suspenso para criar um novo esquema.

  14. Clique em Salvar e continuar.

  15. Na página de Configuração da Base de Dados, clique em Validar para confirmar se a sua fonte está devidamente configurada para a ingestão do Azure Databricks. Quaisquer configurações em falta são retornadas. Para os passos a resolver, clique em Completar configuração. Em seguida, clique em Avançar. Alternativamente, clique em Saltar validação.

  16. (Opcional) Na página de Horários e notificações , clique no ícone Plus. Crie um horário. Defina a frequência para atualizar as tabelas de destino.

  17. (Opcional) Clique no ícone Mais. Adicione uma notificação para definir notificações por email para o sucesso ou fracasso da operação do pipeline, depois clique em Guardar e executar pipeline.

Pacotes de Automação Declarativa

Antes de ingerir usando Pacotes de Automação Declarativa, deve ter acesso a uma ligação existente. Para obter instruções, consulte Conectar-se a fontes de ingestão gerenciadas.

O catálogo e o esquema de preparo podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de preparo não pode ser um catálogo estrangeiro. Especifique a localização de staging na gateway_definition secção do ficheiro YAML do seu pipeline de bundle.

O gateway de ingestão extrai dados instantâneos e de alteração do banco de dados de origem e os armazena no volume de preparo do Catálogo Unity. Você deve executar o gateway como um pipeline contínuo. Isso ajuda a acomodar quaisquer políticas de retenção de log de alterações que você tenha no banco de dados de origem.

O pipeline de ingestão aplica o instantâneo e os dados de alteração do volume de staging em tabelas de streaming de destino.

Os pacotes podem conter definições YAML de trabalhos e tarefas, são gerenciados usando a CLI do Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para mais informações, veja O que são os Pacotes de Automação Declarativa?.

  1. Crie um novo pacote usando a CLI do Databricks:

    databricks bundle init
    
  2. Adicione dois novos arquivos de recursos ao pacote:

    • Um ficheiro de definição de pipeline (por exemplo, resources/sqlserver_pipeline.yml). Veja pipeline.ingestion_definition e exemplos.
    • Um ficheiro de definição de funções que controla a frequência de ingestão de dados (por exemplo, resources/sqlserver_job.yml).
  3. Implante o pipeline usando a CLI do Databricks:

    databricks bundle deploy
    

Caderno de Notas do Databricks

Atualize a célula Configuration no bloco de notas seguinte com a conexão de origem, o catálogo de destino, o esquema de destino e as tabelas para carregar a partir da origem.

Obter notebook

Terraform

Pode usar o Terraform para implementar e gerir pipelines de ingestão do SQL Server. Para um quadro de exemplos completo, incluindo configurações do Terraform para criar gateways e canais de ingestão, consulte o repositório de exemplos do Lakeflow Connect Terraform no GitHub.

Verificar a ingestão bem-sucedida de dados

A exibição de lista na página de detalhes do pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números são atualizados automaticamente.

Verificar a replicação

As Upserted records colunas e Deleted records não são mostradas por padrão. Você pode ativá-los clicando no botão do ícone de configuração de colunas e selecionando-os.

Exemplos

Use estes exemplos para configurar o seu pipeline.

Configuração do pipeline

Pacotes de Automação Declarativa

O seguinte ficheiro de definição de pipeline:

variables:
  # Common variables used multiple places in the DAB definition.
  gateway_name:
    default: sqlserver-gateway
  dest_catalog:
    default: main
  dest_schema:
    default: ingest-destination-schema

resources:
  pipelines:
    gateway:
      name: ${var.gateway_name}
      gateway_definition:
        connection_name: <sqlserver-connection>
        gateway_storage_catalog: main
        gateway_storage_schema: ${var.dest_schema}
        gateway_storage_name: ${var.gateway_name}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

    pipeline_sqlserver:
      name: sqlserver-ingestion-pipeline
      ingestion_definition:
        ingestion_gateway_id: ${resources.pipelines.gateway.id}
        objects:
          # Modify this with your tables!
          - table:
              # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
              source_catalog: test
              source_schema: ingestion_demo
              source_table: lineitem
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
          - schema:
              # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
              # table name will be the same as it is on the source.
              source_catalog: test
              source_schema: ingestion_whole_schema
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}

Caderno de Notas do Databricks

A seguir está uma secção de exemplo Configuration de uma especificação de pipeline:

# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"

# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"

# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name

# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"

# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"

# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
#  + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])

Ficheiro de definição de tarefa em conjunto

Segue-se um exemplo de ficheiro de definição de trabalho para uso com Pacotes de Automação Declarativa. O trabalho executa-se todos os dias, exatamente um dia após a última execução.

resources:
  jobs:
    sqlserver_dab_job:
      name: sqlserver_dab_job

      trigger:
        periodic:
          interval: 1
          unit: DAYS

      email_notifications:
        on_failure:
          - <email-address>

      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}

Padrões comuns

Para configurações avançadas de pipelines, consulte os Padrões comuns para pipelines de ingestão geridos.

Passos seguintes

Inicia, agenda e configura alertas no seu pipeline. Ver Tarefas comuns de manutenção de oleodutos.

Recursos adicionais