Partilhar via


Carregar dados incrementalmente do Base de Dados SQL do Azure para o Armazenamento de Blobs do Azure usando informação de acompanhamento de alterações usando PowerShell

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Data Factory em Microsoft Fabric é a próxima geração de Azure Data Factory, com uma arquitetura mais simples, IA incorporada e novas funcionalidades. Se és novo na integração de dados, começa pelo Fabric Data Factory. As cargas de trabalho existentes do ADF podem atualizar para o Fabric para aceder a novas capacidades em ciência de dados, análise em tempo real e relatórios.

Neste tutorial, cria uma fábrica de dados do Azure com um pipeline que carrega dados delta com base na informação de rastreamento de alterações na base de dados de origem no Base de Dados SQL do Azure para um armazenamento de blobs do Azure.

Vai executar os seguintes passos neste tutorial:

  • Preparar o arquivo de dados de origem
  • Criar uma fábrica de dados.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, destino e de registo de alterações.
  • Criar, executar e monitorizar o pipeline de cópia completa
  • Adicionar ou atualizar os dados na tabela de origem
  • Criar, executar e monitorizar o fluxo da cópia incremental

Nota

Recomendamos que utilize o módulo PowerShell do Azure Az para interagir com o Azure. Para começar, consulte Install Azure PowerShell. Para saber como migrar para o módulo Az PowerShell, veja Migrar Azure PowerShell do AzureRM para o Az.

Descrição geral

Uma solução de integração de dados, que carrega dados incrementalmente após os carregamentos de dados iniciais é um cenário bastante utilizado. Em alguns casos, os dados alterados durante um período no seu arquivo de dados de origem podem ser facilmente segmentados (por exemplo, LastModifyTime, CreationTime). Em alguns casos, não há nenhuma forma explícita para identificar os dados delta da última vez que processou os dados. A tecnologia de Acompanhamento de Alterações suportada por repositórios de dados como o Base de Dados SQL do Azure e o SQL Server pode ser usada para identificar os dados delta. Este tutorial descreve como usar o Azure Data Factory com tecnologia SQL Change Tracking para carregar incrementalmente dados delta do Base de Dados SQL do Azure para o Armazenamento de Blobs do Azure. Para informações mais concretas sobre a tecnologia SQL Change Tracking, consulte Monitorização de alterações em SQL Server.

Fluxo de trabalho ponto a ponto

Eis os passos de fluxo de trabalho ponto-a-ponto normais para carregar dados incrementalmente com recurso à tecnologia de Controlo de Alterações.

Nota

Tanto o Base de Dados SQL do Azure como o SQL Server suportam a tecnologia Change Tracking. Este tutorial utiliza o Base de Dados SQL do Azure como armazenamento de dados de origem. Também podes usar uma instância do SQL Server.

  1. Carregamento inicial de dados históricos (executado uma vez):
    1. Ative a tecnologia de Rastreamento de Alterações na base de dados de origem no Base de Dados SQL do Azure.
    2. Obtenha o valor inicial de SYS_CHANGE_VERSION no banco de dados como a linha de base para capturar dados alterados.
    3. Carregue os dados completos da base de dados de origem para um armazenamento de blob no Azure.
  2. Carregamento incremental de dados delta em um cronograma (executado periodicamente após o carregamento inicial de dados):
    1. Obter os valores SYS_CHANGE_VERSION novos e antigos.
    2. Carregue os dados delta associando as chaves primárias das linhas alteradas (entre dois valores SYS_CHANGE_VERSION) de change_tracking_tables aos dados na tabela de origem, e em seguida, transfira os dados delta para o destino.
    3. Atualize o SYS_CHANGE_VERSION para o carregamento delta da próxima vez.

Solução de alto nível

Neste tutorial, vai criar dois pipelines que realizam as seguintes duas operações:

  1. Initial load: cria-se um pipeline com uma atividade de cópia que copia todos os dados do armazenamento de dados de origem (Base de Dados SQL do Azure) para o armazenamento de dados de destino (Armazenamento de Blobs do Azure).

    Carregamento completo de dados

  2. Carga incremental: cria um pipeline com as seguintes atividades e execute-o periodicamente.

    1. Cria duas atividades de procura para obter o SYS_CHANGE_VERSION antigo e novo do Base de Dados SQL do Azure e passá-los para a atividade de cópia.
    2. Crie uma atividade de cópia uma para transferir os dados inseridos/atualizados/eliminados entre os dois valores do SYS_CHANGE_VERSION do Base de Dados SQL do Azure para o Armazenamento de Blobs do Azure.
    3. Crie uma atividade de procedimento armazenado para atualizar o valor de SYS_CHANGE_VERSION para a próxima execução de pipeline.

    Diagrama de fluxo de carga incremental

Se não tiver uma subscrição Azure, crie uma conta free antes de começar.

Pré-requisitos

  • Azure PowerShell. Instale os módulos Azure PowerShell mais recentes seguindo as instruções em Como instalar e configurar Azure PowerShell.
  • Base de Dados SQL do Azure. A base de dados é utilizada como o arquivo de dados de origem. Se não tiver uma base de dados no Base de Dados SQL do Azure, consulte o artigo Criar uma base de dados no Base de Dados SQL do Azure para os passos para criar uma.
  • conta de Armazenamento do Azure. O armazenamento de blobs é utilizado como repositório de dados de sink. Se não tiver uma conta de armazenamento Azure, consulte o artigo Criar uma conta de armazenamento para os passos para criar uma. Crie um contentor com o nome adftutorial.

Criar uma tabela de fonte de dados em seu banco de dados

  1. Inicia SQL Server Management Studio e liga-te à base de dados SQL.

  2. No Explorador de Servidores, clique com botão direito do rato em base de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL em seu banco de dados para criar uma tabela nomeada data_source_table como armazenamento da fonte de dados.

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. Ative o mecanismo de Controlo de Alterações na sua base de dados e na tabela de origem (data_source_table) ao executar a seguinte consulta SQL:

    Nota

    • Substitua <o nome do seu banco de dados> pelo nome do seu banco de dados que contém a data_source_table.
    • Os dados alterados são mantidos por dois dias no exemplo atual. Se carregar os dados alterados a cada três ou mais dias, alguns dados alterados não são incluídos. Precisa ou de alterar o valor de CHANGE_RETENTION para um número maior. Em alternativa, certifique-se de que o período para carregar os dados alterados esteja dentro de dois dias. Para obter mais informações, consulte Ativar o registo de alterações para uma base de dados
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. Criar uma nova tabela e armazenar ChangeTracking_version com um valor predefinido ao executar a seguinte consulta:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    Nota

    Se os dados não estiverem alterados após ativar o controlo de alterações da Base de Dados SQL, o valor da versão do controlo de alterações é 0.

  6. Execute a consulta a seguir para criar um procedimento armazenado em seu banco de dados. O pipeline invoca este procedimento armazenado para atualizar a versão de controlo de alterações na tabela que criou no passo anterior.

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShell

Instale os módulos Azure PowerShell mais recentes seguindo as instruções em Como instalar e configurar Azure PowerShell.

Criar uma fábrica de dados

  1. Defina uma variável para o nome do grupo de recursos que vai utilizar nos comandos do PowerShell mais tarde. Copie o texto do comando seguinte para o PowerShell, especifique um nome para o grupo de recursos Azure entre aspas duplas e depois execute o comando. Por exemplo: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Se o grupo de recursos já existir, talvez não queira sobrescrevê-lo. Atribua outro valor à variável $resourceGroupName e execute novamente o comando

  2. Defina uma variável para a localização da fábrica de dados:

    $location = "East US"
    
  3. Para criar o grupo de recursos do Azure, execute o seguinte comando:

    New-AzResourceGroup $resourceGroupName $location
    

    Se o grupo de recursos já existir, talvez não queira sobrescrevê-lo. Atribua outro valor à variável $resourceGroupName e execute novamente o comando.

  4. Defina uma variável para o nome da fábrica de dados.

    Importante

    Atualize o nome da fábrica de dados para que seja globalmente exclusivo.

    $dataFactoryName = "IncCopyChgTrackingDF";
    
  5. Para criar o data factory, execute o seguinte cmdlet Set-AzDataFactoryV2 :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Tenha em conta os seguintes pontos:

  • O nome da fábrica de dados do Azure deve ser globalmente único. Se receber o erro seguinte, altere o nome e tente novamente.

    The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
    
  • Para criar instâncias Data Factory, a conta de utilizador que usa para iniciar sessão no Azure deve ser membro de contribuidor ou proprietário, ou um administrador da subscrição do Azure.

  • Para uma lista das regiões do Azure em que o Data Factory está disponível atualmente, selecione as regiões que lhe interessam na página seguinte e depois expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, Base de Dados SQL do Azure, etc.) e os computadores (HDInsight, etc.) usados pela data factory podem estar noutras regiões.

Criar serviços ligados

Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta secção, cria serviços ligados à sua conta Armazenamento do Azure e à sua base de dados no Base de Dados SQL do Azure.

Criar serviço ligado ao Armazenamento do Azure.

Neste passo, liga a sua Conta Armazenamento do Azure à fábrica de dados.

  1. Crie um ficheiro JSON com o nome AzureStorageLinkedService.json na pasta C:\ADFTutorials\IncCopyChangeTrackingTutorial com o seguinte conteúdo: (crie a pasta, caso ainda não exista). Substitua <accountName>, <accountKey> pelo nome e chave da sua conta de armazenamento Azure antes de guardar o ficheiro.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. Em Azure PowerShell, mude para a pasta C:\ADFTutorials\IncCopyChangeTrackingTutorial.

  3. Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado: AzureStorageLinkedService. No exemplo seguinte, vai transmitir os valores para os parâmetros ResourceGroupName e DataFactoryName.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Segue-se o resultado do exemplo:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Criar um serviço vinculado Base de Dados SQL do Azure.

Nesta etapa, você vincula seu banco de dados ao data factory.

  1. Crie um arquivo JSON chamado AzureSQLDatabaseLinkedService.json na pasta C:\ADFTutorials\IncCopyChangeTrackingTutorial com o seguinte conteúdo: Substitua <your-server-name> e <your-database-name> pelo nome do servidor e do banco de dados antes de salvar o arquivo. Deve também configurar o seu SQL Server Azure para conceder acesso à identidade gerida da sua fábrica de dados.

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. Em Azure PowerShell, execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço ligado: AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Segue-se o resultado do exemplo:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Criar conjuntos de dados

Neste passo, vai criar conjuntos de dados para representar a origem de dados e o destino dos dados. e o local para armazenar o SYS_CHANGE_VERSION.

Criar um conjunto de dados de origem

Neste passo, vai criar um conjunto de dados para representar os dados de origem.

  1. Crie um ficheiro JSON com o nome SourceDataset.json na mesma pasta com o seguinte conteúdo:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    
  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados: SourceDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um conjunto de dados sink

Neste passo, cria um conjunto de dados para representar os dados que são copiados do arquivo de dados de origem.

  1. Crie um ficheiro JSON com o nome SinkDataset.json na mesma pasta com o seguinte conteúdo:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incchgtracking",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Crias o contentor adftutorial no teu Armazenamento de Blobs do Azure no âmbito dos pré-requisitos. Crie o contentor se ainda não existir ou defina-o com o nome de um contentor existente. Neste tutorial, o nome do arquivo de saída é gerado dinamicamente usando a expressão: @CONCAT('Incremental-', pipeline(). RunId, '.txt').

  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados: SinkDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
    

Crie um conjunto de dados de controlo de alterações

Neste passo, vai criar um conjunto de dados para armazenar a versão de controlo de alterações.

  1. Crie um ficheiro JSON com o nome ChangeTrackingDataset.json na mesma pasta com o seguinte conteúdo:

    {
        "name": " ChangeTrackingDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "table_store_ChangeTracking_version"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Crie a tabela table_store_ChangeTracking_version como parte dos pré-requisitos.

  2. Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados: ChangeTrackingDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
    

    Eis a saída de exemplo do cmdlet:

    DatasetName       : ChangeTrackingDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Criar um pipeline para a cópia completa

Neste passo, cria-se um pipeline com uma atividade de cópia que copia todos os dados do armazenamento de dados de origem (Base de Dados SQL do Azure) para o armazenamento de dados de destino (Armazenamento de Blobs do Azure).

  1. Crie um ficheiro JSON: FullCopyPipeline.json na mesma pasta com o seguinte conteúdo:

    {
        "name": "FullCopyPipeline",
        "properties": {
            "activities": [{
                "name": "FullCopyActivity",
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
    
                "inputs": [{
                    "referenceName": "SourceDataset",
                    "type": "DatasetReference"
                }],
                "outputs": [{
                    "referenceName": "SinkDataset",
                    "type": "DatasetReference"
                }]
            }]
        }
    }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Pipeline para criar o pipeline: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
    

    Segue-se o resultado do exemplo:

     PipelineName      : FullCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {FullCopyActivity}
     Parameters        :
    

Execute o pipeline de cópia completa

Execute o pipeline: FullCopyPipeline utilizando o cmdlet Invoke-AzDataFactoryV2Pipeline.

Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName        

Monitorize o pipeline da cópia completa

  1. Inicie sessão no portal Azure .

  2. Clique em Todos os serviços, pesquise com a palavra-chave data factories e selecione Fábricas de dados.

    Menu de fábricas de dados

  3. Pesquise pela sua fábrica de dados na lista de fábricas de dados e selecione-a para iniciar a página da Fábrica de dados.

    Pesquisar na sua fábrica de dados

  4. Na página Fábrica de dados, clique no mosaico Monitorizar e Gerir.

    Mosaico Monitorizar e Gerir

  5. A Aplicação de Integração de Dados é iniciada num separador separado. Pode ver todas as execuções de pipeline e os respetivos status. Note que no seguinte exemplo, o estado da execução do pipeline é Com Êxito. Pode verificar os parâmetros transmitidos para o pipeline ao clicar na ligação na coluna Parâmetros. Se tiver ocorrido um erro, pode ver uma ligação na coluna Erro. Clique na ligação na coluna Ações.

    A captura de tela mostra a execução do pipeline para uma fábrica de dados.

  6. Ao clicar na ligação na coluna Ações, verá a seguinte página que mostra todas as execuções de atividade no pipeline.

    A captura de tela mostra execuções de atividades para uma fábrica de dados com o link Pipelines destacado.

  7. Para mudar de volta para a visualização Execuções do pipeline, clique em Pipelines conforme mostrado na imagem.

Rever os resultados

Vai ver um ficheiro chamado incremental-<GUID>.txt na pasta incchgtracking do contentor adftutorial.

Ficheiro de saída a partir da cópia completa

O ficheiro deve conter os dados da sua base de dados:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

Adicione mais dados à tabela de origem

Execute a seguinte consulta em seu banco de dados para adicionar uma linha e atualizar uma linha.

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

Criar um pipeline para a cópia do delta

Neste passo, cria um pipeline com as seguintes atividades e execute-o periodicamente. As atividades lookup obtêm as versões SYS_CHANGE_VERSION antigas e novas da Base de Dados SQL do Azure e passam-nas para a atividade de cópia. A atividade copy copia os dados inseridos/atualizados/eliminados entre os dois valores de SYS_CHANGE_VERSION de Base de Dados SQL do Azure para Armazenamento de Blobs do Azure. A atividade de procedimento armazenado atualiza o valor de SYS_CHANGE_VERSION para a próxima execução do pipeline.

  1. Crie um ficheiro JSON: IncrementalCopyPipeline.json na mesma pasta com o seguinte conteúdo:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupLastChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from table_store_ChangeTracking_version"
                        },
                        "dataset": {
                            "referenceName": "ChangeTrackingDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupCurrentChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion"
                        },
                        "dataset": {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupLastChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupCurrentChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "StoredProceduretoUpdateChangeTrackingActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
                        "storedProcedureName": "Update_ChangeTracking_Version",
                        "storedProcedureParameters": {
                            "CurrentTrackingVersion": {
                                "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}",
                                "type": "INT64"
                            },
                            "TableName": {
                                "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}",
                                "type": "String"
                            }
                        }
                    },
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. Execute o cmdlet Set-AzDataFactoryV2Pipeline para criar o pipeline: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Segue-se o resultado do exemplo:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity}
     Parameters        :
    

Executar o fluxo de trabalho de cópia incremental

Execute o pipeline: IncrementalCopyPipeline usando o cmdlet Invoke-AzDataFactoryV2Pipeline .

Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName     

Monitorar o fluxo de trabalho da cópia incremental

  1. Na Aplicação de Integração de Dados, atualize a visualização das execuções de pipeline. Confirme que vê o IncrementalCopyPipeline na lista. Clique na ligação na coluna Ações.

    A captura de tela mostra que o pipeline é executado para uma fábrica de dados, incluindo seu pipeline.

  2. Ao clicar na ligação na coluna Ações, verá a seguinte página que mostra todas as execuções de atividade no pipeline.

    A captura de tela mostra as execuções do pipeline para uma fábrica de dados com várias marcadas como concluídas com sucesso.

  3. Para mudar de volta para a visualização Execuções do pipeline, clique em Pipelines conforme mostrado na imagem.

Rever os resultados

Vai ver um segundo ficheiro na pasta incchgtracking do contentor adftutorial.

Ficheiro de saída a partir da cópia incremental

O arquivo deve ter apenas os dados delta do seu banco de dados. O registo com U é a linha atualizada na base de dados e I é a linha adicionada.

1,update,10,2,U
6,new,50,1,I

As primeiras três colunas são dados alterados de data_source_table. As últimas duas colunas são os metadados da tabela do sistema de controlo de alterações. A quarta coluna é o SYS_CHANGE_VERSION para cada linha alterada. A quinta coluna é a operação: U = atualizar, I = inserir. Para obter detalhes sobre as informações do registo de alterações, consulte CHANGETABLE.

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

Avance para o tutorial a seguir para saber como copiar arquivos novos e alterados somente com base em sua LastModifiedDate: