Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
APLICA-SE A:
Azure Data Factory
Azure Synapse Analytics
Gorjeta
Data Factory em Microsoft Fabric é a próxima geração de Azure Data Factory, com uma arquitetura mais simples, IA incorporada e novas funcionalidades. Se és novo na integração de dados, começa pelo Fabric Data Factory. As cargas de trabalho existentes do ADF podem atualizar para o Fabric para aceder a novas capacidades em ciência de dados, análise em tempo real e relatórios.
Neste tutorial, crias uma Azure Data Factory com um pipeline que carrega dados delta de várias tabelas numa base de dados SQL Server para o Base de Dados SQL do Azure.
Vai executar os seguintes passos neste tutorial:
- Prepare os arquivos de dados de origem e de destino.
- Criar uma fábrica de dados.
- Criar um tempo de execução de integração auto-hospedado.
- Instalar o integration runtime.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, destino e de marca d'água.
- Criar, executar e monitorizar um pipeline.
- Reveja os resultados.
- Adicionou ou atualizou os dados nas tabelas de origem.
- Execute novamente e monitorize o pipeline.
- Reveja os resultados finais.
Descrição geral
Eis os passos importantes para criar esta solução:
Selecionar a coluna de limite de tamanho.
Selecione uma coluna para cada tabela na fonte de dados de origem, que permita identificar os registos novos ou atualizados em cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.
Preparar um arquivo de dados para armazenar o valor de limite de tamanho.
Neste tutorial, irá armazenar o valor de marca-de-água numa base de dados SQL.
Criar um pipeline com as seguintes atividades:
Criar uma atividade ForEach que itera através de uma lista de nomes de tabelas de origem que é transmitida como um parâmetro para o pipeline. Para cada tabela de origem, invoca as seguintes atividades para efetuar o carregamento incremental para essa tabela.
Criar duas atividades de pesquisa. Utilize a primeira atividade de pesquisa para obter o último valor de marca d'água. Utilize a segunda atividade Lookup para obter o novo valor de marca d'água. Estes valores de marca de água são passados para a atividade de cópia.
Crie uma atividade de cópia que copie linhas do armazenamento de dados de origem onde o valor da coluna de marca de água seja maior que o valor antigo da marca de água e menor ou igual ao valor da nova marca de água. Depois, copia os dados delta do armazenamento de dados de origem para o armazenamento Azure Blob como um novo ficheiro.
Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.
Eis o diagrama de nível elevado da solução:
Se não tiver uma subscrição Azure, crie uma conta free antes de começar.
Pré-requisitos
- SQL Server. Neste tutorial, utilizas uma base de dados SQL Server como armazenamento de dados de origem.
- Base de Dados SQL do Azure. Usas uma base de dados no Base de Dados SQL do Azure como destino de dados. Se não tiveres uma base de dados SQL, vê Criar uma base de dados em Base de Dados SQL do Azure para os passos para criar uma.
Crie tabelas de origem na sua base de dados SQL Server
Abra SQL Server Management Studio (SSMS) ou Visual Studio Code e ligue-se à sua base de dados de SQL Server.
No Server Explorer (SSMS) ou no painel Connections (Visual Studio Code), clique com o botão direito na base de dados e escolha Nova Consulta.
Execute o seguinte comando SQL na base de dados para criar tabelas com o nome
customer_tableeproject_table:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Crie tabelas de destino na sua Base de Dados SQL do Azure
Abra SQL Server Management Studio (SSMS) ou Visual Studio Code e ligue-se à sua base de dados de SQL Server.
No Server Explorer (SSMS) ou no painel Connections (Visual Studio Code), clique com o botão direito na base de dados e escolha Nova Consulta.
Execute o seguinte comando SQL na base de dados para criar tabelas com o nome
customer_tableeproject_table:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Crie outra tabela no Base de Dados SQL do Azure para armazenar o valor de ponto de referência elevado.
Execute o seguinte comando SQL na sua base de dados para criar uma tabela chamada
watermarktablepara guardar o valor da marca d'água:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );Inserir valores de marca d'água iniciais para ambas as tabelas de origem na tabela de marca d'água.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Crie um procedimento armazenado na Base de Dados SQL do Azure
Execute o seguinte comando para criar um procedimento armazenado em seu banco de dados. Este procedimento armazenado atualiza o valor da marca d'água após cada execução de um pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Criar tipos de dados e procedimentos armazenados adicionais no Base de Dados SQL do Azure
Execute a consulta a seguir para criar dois procedimentos armazenados e dois tipos de dados em seu banco de dados. São utilizados para intercalar os dados das tabelas de origem nas tabelas de destino.
Para tornar o início da jornada mais fácil, usamos diretamente esses Procedimentos Armazenados, passando os dados delta por meio de uma variável de tabela e, em seguida, os integramos no repositório de destino. Tenha cautela, não é esperado que um "grande" número de linhas delta (mais de 100) seja armazenado na variável de tabela.
Se você precisar mesclar um grande número de linhas delta no repositório de destino, sugerimos que você use a atividade de cópia para copiar todos os dados delta em uma tabela temporária de "preparação" no repositório de destino primeiro e, em seguida, crie seu próprio procedimento armazenado sem usar a variável de tabela para mesclá-los da tabela de "preparação" para a tabela "final".
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Instale os módulos Azure PowerShell mais recentes seguindo as instruções em Instale e configure Azure PowerShell.
Criar uma fábrica de dados
Defina uma variável para o nome do grupo de recursos que vai utilizar nos comandos do PowerShell mais tarde. Copie o texto do comando seguinte para o PowerShell, especifique um nome para o grupo de recursos Azure entre aspas duplas e depois execute o comando. Um exemplo é
"adfrg".$resourceGroupName = "ADFTutorialResourceGroup";Se o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável
$resourceGroupNamee execute novamente o comando.Defina uma variável para a localização da fábrica de dados.
$location = "East US"Para criar o grupo de recursos do Azure, execute o seguinte comando:
New-AzResourceGroup $resourceGroupName $locationSe o grupo de recursos já existir, é possível que não queira substituí-lo. Atribua outro valor à variável
$resourceGroupNamee execute novamente o comando.Defina uma variável para o nome da fábrica de dados.
Importante
Atualize o nome da fábrica de dados para que seja globalmente exclusivo. Um exemplo é ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";Para criar o data factory, execute o seguinte cmdlet Set-AzDataFactoryV2 :
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Tenha em conta os seguintes pontos:
O nome da fábrica de dados tem de ser globalmente exclusivo. Se receber o erro seguinte, altere o nome e tente novamente:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.Para criar instâncias do Data Factory, a conta de utilizador que usa para iniciar sessão no Azure deve ser membro de funções de contribuinte ou proprietário, ou administrador da subscrição do Azure.
Para uma lista das regiões do Azure em que o Data Factory está disponível atualmente, selecione as regiões que lhe interessam na página seguinte e depois expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, SQL Database, SQL Managed Instance, e assim por diante) e os computadores (Azure HDInsight, etc.) usados pela data factory podem estar noutras regiões.
Criar um tempo de execução de integração autogerido
Nesta secção, cria um runtime de integração auto-hospedado e associa-o a uma máquina local com a base de dados do SQL Server. O runtime de integração auto-hospedado é o componente que copia os dados do SQL Server na sua máquina para o Base de Dados SQL do Azure.
Crie uma variável para o nome do integration runtime. Utilize um nome exclusivo e tome nota do mesmo. Vai utilizá-lo mais tarde no tutorial.
$integrationRuntimeName = "ADFTutorialIR"Criar um tempo de execução de integração auto-hospedado.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupNameSegue-se o resultado do exemplo:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIRPara obter o estado do integration runtime criado, execute o comando seguinte. Confirme que o valor da propriedade Estado está definido como NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -StatusSegue-se o resultado do exemplo:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>Para recuperar as chaves de autenticação usadas para registar o runtime de integração auto-hospedado com o serviço Azure Data Factory na cloud, execute o seguinte comando:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-JsonSegue-se o resultado do exemplo:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }Copie uma das chaves (exclua as aspas) utilizadas para registar o runtime de integração autoalojado que instalar no computador nas etapas seguintes.
Instalar a ferramenta de integração do runtime
Se já tiver o integration runtime no seu computador, desinstale-o utilizando Adicionar ou Remover Programas.
Download o runtime de integração auto-hospedado numa máquina Windows local. Inicie a instalação.
Na página Bem-vindo à Configuração do Microsoft Integration Runtime, selecione Seguinte.
Na página Contrato de Licença do Utilizador Final, aceite os termos e o contrato de licença e selecione Seguinte.
Na página Pasta de Destino, selecione Seguinte.
Na página Pronto para instalar Microsoft Integration Runtime, selecione Instalar.
Na página Configuração do Microsoft Integration Runtime Concluída, selecione Terminar.
Na página Registar Integration Runtime (Auto-hospedado), cole a chave que guardou na secção anterior e selecione Registar.
Na página New Integration Runtime (Self-hosted), selecione Terminar.
Quando o runtime de integração autogerido for registado com êxito, irá ver a seguinte mensagem:
Na página Registar o Integration Runtime (Auto-hospedado), selecione Iniciar o Gestor de Configuração.
Quando o nó estiver conectado ao serviço de nuvem, verá a página seguinte:
Agora, teste a conectividade à sua base de dados SQL Server.
a. Na página Gestor de Configuração, vá ao separador Diagnostics.
b. Selecione SqlServer para o tipo de origem de dados.
c. Introduza o nome do servidor.
d. Introduza o nome da base de dados.
e. Selecione o modo de autenticação.
f. Introduza o nome de utilizador.
g. Introduza a palavra-passe associada ao nome de utilizador.
h. Selecione Test para confirmar que o tempo de execução da integração pode ligar-se a SQL Server. Se a ligação for bem-sucedida, verá uma marca de verificação verde. Se a ligação não for bem-sucedida, verá uma mensagem de erro. Corrija quaisquer problemas e assegure que o runtime de integração pode ligar-se ao SQL Server.
Nota
Tome nota dos valores para o tipo de autenticação, servidor, base de dados, utilizador e palavra-passe. Vai utilizá-los mais tarde no tutorial.
Criar serviços ligados
Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta secção, cria serviços ligados à sua base de dados SQL Server e à sua base de dados no Base de Dados SQL do Azure.
Criar o serviço ligado ao SQL Server
Neste passo, liga a sua base de dados SQL Server à fábrica de dados.
Crie um arquivo JSON chamado SqlServerLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial (crie as pastas locais se elas ainda não existirem) com o seguinte conteúdo. Selecione a secção certa com base na autenticação que usa para se ligar ao SQL Server.
Importante
Selecione a secção certa com base na autenticação que usa para se ligar ao SQL Server.
Se utilizar a autenticação do SQL, copie a seguinte definição JSON:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Se usar Windows authentication, copie a seguinte definição JSON:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Importante
- Selecione a secção certa com base na autenticação que usa para se ligar ao SQL Server.
- Substitua <o nome> do tempo de execução de integração pelo nome do seu tempo de execução de integração.
- Substitua <nomedo do servidor>, <nome da base de dados>, <nome de utilizador> e <password> pelos valores da sua base de dados de SQL Server antes de guardar o ficheiro.
- Se precisar de utilizar um caráter de barra (
\) no nome da sua conta de utilizador ou no nome do seu servidor, utilize o caráter de escape (\). Um exemplo émydomain\\myuser.
No PowerShell, executa o seguinte cmdlet para ir para a pasta C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'Execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureStorageLinkedService. No exemplo seguinte, vai transmitir os valores para os parâmetros ResourceGroupName e DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"Segue-se o resultado do exemplo:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Criar o serviço vinculado do Banco de dados SQL
Crie um arquivo JSON chamado AzureSQLDatabaseLinkedService.json na pasta C:\ADFTutorials\IncCopyMultiTableTutorial com o seguinte conteúdo. (Crie a pasta ADF se ainda não existir.) Substitua <nome do servidor>, <nome da base de dados>, <nome de utilizador> e <password> pelo nome da sua base de dados SQL Server, nome da base de dados, nome de utilizador e palavra-passe antes de guardar o ficheiro.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }No PowerShell, execute o cmdlet Set-AzDataFactoryV2LinkedService para criar o serviço vinculado AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"Segue-se o resultado do exemplo:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Criar conjuntos de dados
Nesta etapa, vai criar conjuntos de dados para representar a origem de dados, o destino de dados e o local para armazenar a marca d'água.
Criar um conjunto de dados de origem
Crie um ficheiro JSON com o nome SourceDataset.json na mesma pasta com o seguinte conteúdo:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }A atividade Copy no pipeline utiliza uma consulta SQL para carregar os dados em vez de carregar toda a tabela.
Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"Eis a saída de exemplo do cmdlet:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Criar um conjunto de dados de destino
Crie um arquivo JSON chamado SinkDataset.json na mesma pasta com o seguinte conteúdo. O elemento tableName é definido pelo pipeline dinamicamente durante a execução. A atividade ForEach no pipeline itera através de uma lista de nomes de tabelas e transmite o nome da tabela para este conjunto de dados em cada iteração.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"Eis a saída de exemplo do cmdlet:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Criar um conjunto de dados para uma marca d'água
Neste passo, vai criar um conjunto de dados para armazenar um valor de limite superior de tamanho.
Crie um arquivo JSON chamado WatermarkDataset.json na mesma pasta com o seguinte conteúdo:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }Execute o cmdlet Set-AzDataFactoryV2Dataset para criar o conjunto de dados WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"Eis a saída de exemplo do cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Criar um pipeline
O pipeline aceita uma lista de nomes de tabela como parâmetro. A atividade ForEach itera através da lista de nomes de tabela e executa as seguintes operações:
Use uma "atividade Lookup" para recuperar o valor da marca d'água antiga (o valor inicial ou o que foi usado na última iteração).
Use a atividade de Pesquisa para recuperar o novo valor da marca d'água (o valor máximo da coluna marca d'água na tabela de origem).
Use a atividade de cópia para transferir dados entre os dois valores de marca d'água da base de dados de origem para a base de dados de destino.
Utilize a atividade StoredProcedure com o objetivo de atualizar o valor da marca d'água antiga a ser usado na primeira etapa da próxima iteração.
Criar o pipeline
Crie um arquivo JSON chamado IncrementalCopyPipeline.json na mesma pasta com o seguinte conteúdo:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }Execute o cmdlet Set-AzDataFactoryV2Pipeline para criar o pipeline IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"Segue-se o resultado do exemplo:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Executar o pipeline
Crie um arquivo de parâmetro chamado Parameters.json na mesma pasta com o seguinte conteúdo:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }Execute o processo IncrementalCopyPipeline usando o cmdlet Invoke-AzDataFactoryV2Pipeline. Substitua os marcadores de posição pelos nomes do seu grupo de recursos e da sua fábrica de dados.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitorizar o fluxo de trabalho
Inicie sessão no portal Azure.
Clique em Todos os serviços, pesquise com a palavra-chave Fábricas de dados e selecione Fábricas de dados.
Pesquise pela sua fábrica de dados na lista de fábricas de dados e selecione-a para abrir a página Fábrica de dados.
Na página Data factory, selecione Open na peça Open Azure Data Factory Studio para iniciar Azure Data Factory num separador separado.
Na página inicial Azure Data Factory, selecione Monitor do lado esquerdo.
Pode ver todas as execuções de pipelines e os respetivos estados. Note que no seguinte exemplo, o estado da execução do pipeline é Com Êxito. Para verificar os parâmetros transmitidos para o pipeline, selecione a ligação na coluna Parâmetros. Se tiver ocorrido um erro, pode ver uma ligação na coluna Erro.
Ao selecionar o link na coluna Ações , você verá todas as atividades executadas para o pipeline.
Para voltar ao modo de exibição Execuções de Pipeline, selecione Todas as Execuções de Pipeline.
Rever os resultados
No SQL Server Management Studio, execute as seguintes consultas na base de dados SQL de destino para verificar se os dados foram copiados das tabelas de origem para as tabelas de destino:
Consulta
select * from customer_table
Saída
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Consulta
select * from project_table
Saída
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Consulta
select * from watermarktable
Saída
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.
Adicione mais dados às tabelas de origem
Execute a seguinte consulta na base de dados de origem do SQL Server para atualizar uma linha existente na tabela de clientes. Insira uma linha nova em project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
Volte a executar o pipeline
Agora, execute novamente o pipeline executando o seguinte comando do PowerShell:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"Siga as instruções na secção Monitorizar o pipeline para monitorizar as execuções do pipeline. Quando o status do pipeline estiver Em andamento, você verá outro link de ação em Ações para cancelar a execução do pipeline.
Selecione Atualizar para atualizar a lista até a execução do pipeline ter êxito.
Opcionalmente, selecione a ligação Ver Execuções de Atividades, em Ações, para ver todas a execuções de atividades associadas a esta execução de pipeline.
Rever os resultados finais
No SQL Server Management Studio, execute as seguintes consultas na base de dados de destino para verificar se os dados atualizados/novos foram copiados das tabelas de origem para as tabelas de destino.
Consulta
select * from customer_table
Saída
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Repare nos valores novos de Name e LastModifytime para PersonID relativamente ao número 3.
Consulta
select * from project_table
Saída
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Repare que a entrada NewProject foi adicionada a project_table.
Consulta
select * from watermarktable
Saída
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.
Conteúdos relacionados
Neste tutorial, executou os passos seguintes:
- Prepare os arquivos de dados de origem e de destino.
- Criar uma fábrica de dados.
- Crie um tempo de execução de integração auto-hospedado (IR).
- Instalar o integration runtime.
- Criar serviços ligados.
- Crie conjuntos de dados de origem, destino e de marca d'água.
- Criar, executar e monitorizar um pipeline.
- Reveja os resultados.
- Adicionou ou atualizou os dados nas tabelas de origem.
- Execute novamente e monitorize o pipeline.
- Reveja os resultados finais.
Avance para o seguinte tutorial para aprender a transformar dados usando um cluster Spark no Azure: