Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
APLICA-SE A:
Azure Data Factory
Azure Synapse Analytics
Dica
Data Factory no Microsoft Fabric é a próxima geração de Azure Data Factory, com uma arquitetura mais simples, IA interna e novos recursos. Se você não estiver familiarizado com a integração de dados, comece com Fabric Data Factory. As cargas de trabalho existentes do ADF podem ser atualizadas para Fabric para acessar novos recursos em ciência de dados, análise em tempo real e relatórios.
Neste tutorial, você pode usar o Azure PowerShell para criar um pipeline do Data Factory que transforma dados usando a Atividade Spark e um serviço vinculado HDInsight sob demanda. Neste tutorial, você realizará os seguintes procedimentos:
- Criar uma fábrica de dados.
- Criar e implantar serviços vinculados.
- Criar e implantar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorar a execução de pipeline.
Se você não tiver uma assinatura Azure, crie uma conta free antes de começar.
Pré-requisitos
Observação
Recomendamos que você use o módulo Azure Az PowerShell para interagir com Azure. Para começar, consulte Instalar Azure PowerShell. Para saber como migrar para o módulo do Az PowerShell, consulte Migrate Azure PowerShell do AzureRM para o Az.
- Conta de Armazenamento do Azure. Crie um script Python e um arquivo de entrada e carregue-os no armazenamento Azure. A saída do programa Spark é armazenada nessa conta de armazenamento. O cluster do Spark sob demanda usa a mesma conta de armazenamento que o respectivo armazenamento primário.
- Azure PowerShell. Siga as instruções em Como instalar e configurar Azure PowerShell.
Carregar Python script em sua conta Armazenamento de Blobs
Crie um arquivo de Python chamado WordCount_Spark.py com o seguinte conteúdo:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()Substitua <storageAccountName> pelo nome da sua conta Armazenamento do Azure. Em seguida, salve o arquivo.
Em seu Armazenamento de Blobs do Azure, crie um contêiner chamado adftutorial se ele não existir.
Crie uma pasta chamada spark.
Criar uma subpasta chamada script na pasta spark.
Carregue o arquivo WordCount_Spark.py na subpasta script.
Carregue o arquivo de entrada
- Crie um arquivo chamado minecraftstory.txt com um pouco de texto. O programa Spark conta o número de palavras no texto.
- Criar uma subpasta chamada
inputfilesna pastaspark. - Carregue o
minecraftstory.txtna subpastainputfiles.
Criar serviços vinculados
Você cria dois serviços vinculados nesta seção:
- Um serviço vinculado do Armazenamento do Azure que conecta uma conta do Armazenamento do Azure ao Data Factory. Esse armazenamento é usado pelo cluster HDInsight sob demanda. Ele também contém o script Spark a ser executado.
- Um serviço vinculado do HDInsight on-demand. Azure Data Factory cria automaticamente um cluster HDInsight, executa o programa Spark e exclui o cluster HDInsight depois que ele fica ocioso por um tempo pré-configurado.
Serviço vinculado de armazenamento do Azure
Crie um arquivo JSON usando seu editor preferencial, copie a seguinte definição JSON de um serviço vinculado Armazenamento do Azure e salve o arquivo como MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Atualize o <storageAccountName> e <storageAccountKey> com o nome e a chave de sua conta Armazenamento do Azure.
Serviço associado do HDInsight sob demanda
Crie um arquivo JSON usando seu editor preferencial, copie a seguinte definição JSON de um serviço vinculado Azure HDInsight e salve o arquivo como MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Atualize os valores para as propriedades a seguir na definição de serviço vinculado:
- hostSubscriptionId. Substitua <subscriptionID> pela ID da assinatura do Azure. O cluster HDInsight sob demanda é criado nessa assinatura.
- locatário. Substitua <tenantID> pela ID do locatário Azure.
- servicePrincipalId, servicePrincipalKey. Substitua <servicePrincipalID> e <servicePrincipalKey> pelo ID e chave do seu principal de serviço no Microsoft Entra ID. Essa entidade de serviço precisa ser um membro da função de Colaborador de assinatura ou o grupo de recursos em que o cluster é criado. Consulte Criar a entidade de serviço e o aplicativo do Microsoft Entra para obter detalhes. A ID da entidade de serviço é equivalente à ID do aplicativo e uma Chave de entidade de serviço é equivalente ao valor de um Segredo do cliente.
- clusterResourceGroup. Substitua <resourceGroupOfHDICluster> pelo nome do grupo de recursos no qual o cluster HDInsight precisa ser criado.
Observação
Azure HDInsight tem limitação no número total de núcleos que você pode usar em cada região Azure que ele dá suporte. Para o Serviço Vinculado do HDInsight sob Demanda, o cluster HDInsight será criado no mesmo local do Armazenamento do Azure usado como armazenamento primário. Verifique se você tem cotas de núcleo suficientes para que o cluster seja criado com êxito. Para obter mais informações, consulte Configurar clusters no HDInsight com Hadoop, Spark, Kafka e mais.
Criar um pipeline
Nesta etapa, você cria um novo pipeline com uma atividade Spark. A atividade usa a amostra de contagem de palavras. Baixe o conteúdo dessa localização, se você ainda não tiver feito isso.
Crie um arquivo JSON em seu editor preferido, copie a definição de JSON a seguir de uma definição de pipeline e salve-a como MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Observe os seguintes pontos:
- rootPath aponta para a pasta Spark do contêiner ADFtutorial.
- entryFilePath aponta para o arquivo WordCount_Spark.py na subpasta script da pasta spark.
Criar uma fábrica de dados (data factory)
Você criou definições de serviço vinculado e de pipeline em arquivos JSON. Agora, criaremos um data factory e implantaremos o serviço vinculado e os arquivos JSON de pipeline usando cmdlets do PowerShell. Execute os seguintes comandos do PowerShell, um de cada vez:
Defina as variáveis uma a uma.
Nome do Grupo de Recursos
$resourceGroupName = "ADFTutorialResourceGroup"Nome do Data Factory. Ser globalmente exclusivo
$dataFactoryName = "MyDataFactory09102017"Nome do pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineInicie o PowerShell. Mantenha Azure PowerShell aberto até o final deste início rápido. Se você fechar e reabrir, precisará executar os comandos novamente. Para obter uma lista de Azure regiões em que o Data Factory está disponível no momento, selecione as regiões que lhe interessam na página a seguir e expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL do Azure etc.) e computação (HDInsight etc.) usados pelo data factory podem estar em outras regiões.
Execute o seguinte comando e insira o nome de usuário e a senha que você usa para entrar no portal do Azure:
Connect-AzAccountExecute o comando abaixo para exibir todas as assinaturas dessa conta:
Get-AzSubscriptionExecute o comando a seguir para selecionar a assinatura com a qual deseja trabalhar. Substitua SubscriptionId pela ID da assinatura do Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"Crie o grupo de recursos: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Crie o data factory.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameExecute o comando a seguir para ver a saída:
$dfAlterne para a pasta na qual você criou arquivos JSON e execute o seguinte comando para implantar um serviço vinculado Armazenamento do Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Execute o comando a seguir para implantar um serviço vinculado do Spark sob demanda:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Execute o seguinte comando para implantar um pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Iniciar e monitorar uma execução de pipeline
Iniciar uma execução de pipeline. Captura também a ID de execução do pipeline para monitoramento futuro.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameExecute o script a seguir para verificar continuamente o status do pipeline de execução até que ele termine.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Aqui está a saída da execução de exemplo:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Confirme que uma pasta denominada
outputfilesé criada na pastasparkdo contêiner adftutorial com a saída do programa Spark.
Conteúdo relacionado
O pipeline neste exemplo copia dados de um local para outro em um armazenamento de blobs do Azure. Você aprendeu a:
- Criar uma fábrica de dados.
- Criar e implantar serviços vinculados.
- Criar e implantar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorar a execução de pipeline.
Avance para o próximo tutorial para saber como transformar dados executando o script do Hive em um cluster de Azure HDInsight que está em uma rede virtual.