Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
APLICA-SE A:
Azure Data Factory
Azure Synapse Analytics
Gorjeta
Data Factory em Microsoft Fabric é a próxima geração de Azure Data Factory, com uma arquitetura mais simples, IA incorporada e novas funcionalidades. Se és novo na integração de dados, começa pelo Fabric Data Factory. As cargas de trabalho existentes do ADF podem atualizar para o Fabric para aceder a novas capacidades em ciência de dados, análise em tempo real e relatórios.
Neste tutorial, utiliza o Azure PowerShell para criar um pipeline de Data Factory que transforma dados usando o Spark Activity e um serviço ligado ao HDInsight sob demanda. Vai executar os seguintes passos neste tutorial:
- Criar uma fábrica de dados.
- Autorizar e implementar serviços integrados.
- Criar e implementar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorizar a execução do pipeline.
Se não tiver uma subscrição Azure, crie uma conta free antes de começar.
Pré-requisitos
Nota
Recomendamos que utilize o módulo PowerShell do Azure Az para interagir com o Azure. Para começar, consulte Install Azure PowerShell. Para saber como migrar para o módulo Az PowerShell, veja Migrar Azure PowerShell do AzureRM para o Az.
- conta de Armazenamento do Azure. Cria um script Python e um ficheiro de entrada, e carrega-os para o armazenamento da Azure. A saída do programa Spark é armazenada nesta conta de armazenamento. O cluster do Spark a pedido utiliza a mesma conta de armazenamento como o respetivo armazenamento primário.
- Azure PowerShell. Siga as instruções em Como instalar e configurar Azure PowerShell.
Carregue o script Python para a sua conta Armazenamento de Blobs
Crie um ficheiro Python chamado WordCount_Spark.py com o seguinte conteúdo:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()Substitua <storageAccountName> pelo nome da sua conta Armazenamento do Azure. Em seguida, guarde o ficheiro.
No seu armazenamento de blobs do Azure, crie um contentor denominado adftutorial se não existir.
Crie uma pasta com o nome spark.
Crie uma subpasta com o nome script na pasta spark.
Carregue o ficheiro WordCount_Spark.py para a subpasta script.
Carregue o ficheiro de entrada
- Crie um ficheiro com o nome minecraftstory.txt com algum texto. O programa Spark conta o número de palavras neste texto.
- Crie uma subpasta com o nome
inputfilesna pastaspark. - Carregue o ficheiro
minecraftstory.txtpara a subpastainputfiles.
Criar serviços ligados
Nesta secção, vai criar dois Serviços Ligados:
- Um Serviço Ligado do Armazenamento do Azure que liga uma conta Armazenamento do Azure à fábrica de dados. Este armazenamento é utilizado pelo cluster do HDInsight a pedido. Também contém o script Spark que vai ser executado.
- Um Serviço Ligado do HDInsight a Pedido. O Azure Data Factory cria automaticamente um cluster HDInsight, executa o programa Spark e depois elimina o cluster HDInsight depois de estar inativo durante um tempo pré-configurado.
Serviço vinculado de Armazenamento Azure
Crie um ficheiro JSON usando o seu editor preferido, copie a seguinte definição JSON de um serviço ligado Armazenamento do Azure e depois guarde o ficheiro como MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Atualize o <storageAccountName> e <storageAccountKey> com o nome e a chave da sua conta Armazenamento do Azure.
Serviço on-demand vinculado ao HDInsight
Crie um ficheiro JSON usando o seu editor preferido, copie a seguinte definição JSON de um serviço ligado Azure HDInsight e guarde o ficheiro como MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Atualize os valores para as seguintes propriedades na definição de serviço ligado:
- hostSubscriptionId. Substitua <subscriptionID> pelo ID da sua subscrição Azure. O cluster do HDInsight a pedido é criado nesta subscrição.
- tenant. Substitua <tenantID> pelo ID do seu inquilino Azure.
- IdPrincipalServiço, ChavePrincipalServiço. Substitua <servicePrincipalID> e <servicePrincipalKey> por ID e chave do seu principal de serviço no Microsoft Entra ID. Este principal de serviço tem de ser membro da função de Contribuinte da subscrição ou do Grupo de recursos no qual o cluster é criado. Consulte create Microsoft Entra application and service principal para mais detalhes. A ID da entidade de serviço é equivalente à ID do aplicativo e uma chave da entidade de serviço é equivalente ao valor de um segredo do cliente.
- clusterResourceGroup. Substitua <resourceGroupOfHDICluster> pelo nome do grupo de recursos no qual o cluster do HDInsight tem de ser criado.
Nota
O Azure HDInsight tem limitações quanto ao número total de núcleos que podes usar em cada região do Azure que suporta. Para o Serviço Ligado HDInsight On-Demand, o cluster HDInsight será criado na mesma localização do Armazenamento do Azure usado como armazenamento principal. ** Certifique-se de que tem um número suficiente de quotas de CPU para o cluster ser criado com sucesso. Para obter mais informações, veja Configurar clusters no HDInsight com o Hadoop, Spark, Kafka e muito mais.
Criar um pipeline
Neste passo, vai criar um novo pipeline com uma atividade do Spark. A atividade utiliza o exemplo de contagem de palavras. Transfira os conteúdos a partir desta localização, caso ainda não o tenha feito.
Crie um ficheiro JSON no seu editor preferencial, copie a seguinte definição JSON de uma definição de pipeline e guarde-o como MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Tenha em conta os seguintes pontos:
- rootPath aponta para a pasta spark do contentor adftutorial.
- entryFilePath aponta para o ficheiro WordCount_Spark.py na subpasta script da pasta spark.
Criar uma fábrica de dados
Criou definições de serviço vinculado e de pipeline em ficheiros JSON. Agora, vamos criar uma fábrica de dados e implantar os arquivos JSON de serviço e pipeline vinculados usando cmdlets do PowerShell. Execute os seguintes comandos do PowerShell um a um:
Defina as variáveis uma a uma.
Nome do Grupo de Recursos
$resourceGroupName = "ADFTutorialResourceGroup"Nome do Data Factory. Tem de ser globalmente exclusivo
$dataFactoryName = "MyDataFactory09102017"Nome do Pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineInicie o PowerShell. Mantenha o Azure PowerShell aberto até ao final deste início rápido. Se fechar e reabrir, terá de executar os comandos novamente. Para uma lista das regiões do Azure em que o Data Factory está disponível atualmente, selecione as regiões que lhe interessam na página seguinte e depois expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, Base de Dados SQL do Azure, etc.) e os computadores (HDInsight, etc.) usados pela data factory podem estar noutras regiões.
Execute o seguinte comando e introduza o nome de utilizador e a palavra-passe que utiliza para iniciar sessão no portal Azure:
Connect-AzAccountExecute o comando seguinte para ver todas as subscrições desta conta:
Get-AzSubscriptionExecute o comando seguinte para selecionar a subscrição com a qual pretende trabalhar. Substitua SubscriptionId pelo ID da sua subscrição Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"Crie o grupo de recursos: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Crie a fábrica de dados.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameExecute o seguinte comando para ver a saída:
$dfMude para a pasta onde criou ficheiros JSON e execute o seguinte comando para implementar um serviço ligado ao Armazenamento do Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Execute o seguinte comando para implementar um serviço conectado do Spark sob demanda:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Execute o seguinte comando para implementar um pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Iniciar e monitorizar uma execução de pipeline
Iniciar uma execução de pipeline. Também captura o ID de execução do pipeline para monitorização futura.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameExecute o script seguinte para verificar continuamente o estado de execução do pipeline até terminar.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Eis o resultado da execução de exemplo:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Confirme que uma pasta com o nome
outputfilesé criada na pastasparkdo contentor adftutorial com a saída do programa spark.
Conteúdos relacionados
O pipeline neste exemplo copia dados de um local para outro num armazenamento de blob do Azure. Aprendeu a:
- Criar uma fábrica de dados.
- Autorizar e implementar serviços integrados.
- Criar e implementar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorizar a execução do pipeline.
Avance para o próximo tutorial para aprender a transformar dados executando um script Hive num cluster Azure HDInsight que esteja numa rede virtual.