適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。
このチュートリアルでは、Azure PowerShellを使用して、Spark アクティビティとオンデマンド HDInsight のリンクされたサービスを使用してデータを変換する Data Factory パイプラインを作成します。 このチュートリアルでは、以下の手順を実行します。
- データ ファクトリを作成します。
- リンクされたサービスを作成してデプロイします。
- パイプラインを作成してデプロイします。
- パイプラインの実行を開始します。
- パイプラインの実行を監視します。
Azure サブスクリプションをお持ちでない場合は、開始する前に free アカウントを作成してください。
前提条件
注釈
Azure Az PowerShell モジュールを使用してAzureを操作することをお勧めします。 作業を開始するには、「Install Azure PowerShellを参照してください。 Az PowerShell モジュールに移行する方法については、「
- Azure Storage アカウント。 Python スクリプトと入力ファイルを作成し、Azure ストレージにアップロードします。 Spark プログラムからの出力は、このストレージ アカウントに格納されます。 オンデマンドの Spark クラスターでは、同じストレージ アカウントがプライマリ ストレージとして使用されます。
-
Azure PowerShell。
Azure PowerShell の手順に従います。
Blob Storage アカウントPythonスクリプトをアップロードする
次の内容を含む WordCount_Spark.py という名前のPython ファイルを作成します。
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()<storageAccountName> をAzure Storage アカウントの名前に置き換えます。 その後、ファイルを保存します。
Azure Blob Storageで、adftutorial という名前のコンテナーが存在しない場合は作成します。
spark という名前のフォルダーを作成します。
spark フォルダーの下に、script という名前のサブフォルダーを作成します。
WordCount_Spark.py ファイルを script サブフォルダーにアップロードします。
入力ファイルをアップロードする
- minecraftstory.txt という名前のファイルを作成し、任意のテキストを入力しておきます。 このテキストの単語数が Spark プログラムによってカウントされます。
-
inputfilesフォルダーに、sparkという名前のサブフォルダーを作成します。 -
minecraftstory.txtをinputfilesサブフォルダーにアップロードします。
著者によってリンクされたサービス
このセクションでは、次の 2 つのリンクされたサービスを作成します。
- Azure Storage アカウントをデータファクトリに接続する Azure Storage のリンクされたサービス。 このストレージは、オンデマンドの HDInsight クラスターによって使用されます。 ここには、実行される Spark スクリプトも含まれています。
- オンデマンドの HDInsight のリンクされたサービス。 Azure Data Factoryは、HDInsight クラスターを自動的に作成し、Spark プログラムを実行してから、事前構成された時間アイドル状態の後に HDInsight クラスターを削除します。
リンクされたサービスAzure Storage
任意のエディターを使用して JSON ファイルを作成し、Azure Storageリンクされたサービスの次の JSON 定義をコピーし、ファイルを MyStorageLinkedService.jsonとして保存します。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
<storageAccountName> および <storageAccountKey> を、Azure Storage アカウントの名前とキーで更新します。
オンデマンドの HDInsight 連携サービス
任意のエディターを使用して JSON ファイルを作成し、Azure HDInsightリンクされたサービスの次の JSON 定義をコピーし、ファイルを MyOnDemandSparkLinkedService.json として保存します。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
リンクされたサービスの定義で、以下のプロパティの値を更新します。
- hostSubscriptionId。 <subscriptionID> を、Azure サブスクリプションの ID に置き換えます。 オンデマンドの HDInsight クラスターは、このサブスクリプション内に作成されます。
- テナント。 <tenantID> をAzure テナントの ID に置き換えます。
- servicePrincipalId、servicePrincipalKey。 <servicePrincipalID> および <servicePrincipalKey> を、Microsoft Entra IDのサービス プリンシパルの ID とキーに置き換えます。 このサービス プリンシパルは、サブスクリプションまたはクラスターが作成されるリソース グループの共同作成者ロールのメンバーである必要があります。 詳細については、アプリケーションとサービス プリンシパル Microsoft Entraの作成を参照してください。 [サービス プリンシパル ID] は "アプリケーション ID" に、 [サービス プリンシパル キー] は "クライアント シークレット" の値に相当します。
- clusterResourceGroup。 <resourceGroupOfHDICluster> は、HDInsight クラスターの作成先であるリソース グループの名前に置き換えてください。
注釈
Azure HDInsightでは、サポートされている各Azureリージョンで使用できるコアの合計数に制限があります。 オンデマンド HDInsight のリンクされたサービスの場合、HDInsight クラスターは、プライマリ ストレージとして使用されるAzure Storageの同じ場所に作成されます。 クラスターを正常に作成するための十分なコア クォータがあることを確認します。 詳細については、「Hadoop、Spark、Kafka などの HDInsight クラスターをセットアップする」を参照してください。
パイプラインを作成する
この手順では、Spark アクティビティがある新しいパイプラインを作成します。 アクティビティでは word count サンプルが使用されます。 まだコンテンツをダウンロードしていない場合は、この場所からダウンロードします。
任意のエディターで JSON ファイルを作成し、パイプライン定義から次の JSON 定義をコピーして、MySparkOnDemandPipeline.json として保存します。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
以下の点に注意してください。
- rootPath は、adftutorial コンテナーの spark フォルダーを指します。
- entryFilePath は、spark フォルダーの script サブフォルダーの WordCount_Spark.py ファイルを指します。
Data Factory の作成
JSON ファイルで、リンクされたサービスとパイプライン定義を作成しました。 次に、PowerShell コマンドレットを使用して、データ ファクトリを作成し、リンクされたサービスとパイプライン JSON ファイルをデプロイしましょう。 以下の PowerShell コマンドを 1 つずつ実行します。
変数を 1 つずつ設定します。
リソース グループ名
$resourceGroupName = "ADFTutorialResourceGroup"データ ファクトリ名。 グローバルに一意であること
$dataFactoryName = "MyDataFactory09102017"パイプライン名
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelinePowerShellを起動します。 このクイック スタートが終わるまで、Azure PowerShellを開いたままにしておいてください。 Azure PowerShell を閉じて再度開いた場合は、これらのコマンドをもう一度実行する必要があります。 Data Factory が現在使用できるAzureリージョンの一覧については、次のページで目的のリージョンを選択し、Analytics を展開して、Data Factory: Products available by region> を探します。 データ ファクトリで使用されるデータ ストア (Azure Storage、Azure SQL Databaseなど) とコンピューティング (HDInsight など) は、他のリージョンに存在できます。
次のコマンドを実行し、Azure ポータルへのサインインに使用するユーザー名とパスワードを入力します。
Connect-AzAccount次のコマンドを実行して、このアカウントのすべてのサブスクリプションを表示します。
Get-AzSubscription次のコマンドを実行して、使用するサブスクリプションを選択します。 SubscriptionId をAzure サブスクリプションの ID に置き換えます。
Select-AzSubscription -SubscriptionId "<SubscriptionId>"リソース グループを作成します。ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"データ ファクトリを作成します。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName次のコマンドを実行して、出力を表示します。
$dfJSON ファイルを作成したフォルダーに切り替え、次のコマンドを実行して、Azure Storageリンクされたサービスをデプロイします。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"次のコマンドを実行し、オンデマンドの Spark のリンクされたサービスをデプロイします。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"次のコマンドを実行し、パイプラインをデプロイします。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
パイプラインの実行を開始して監視する
パイプラインの実行を開始します。 将来の監視のために、パイプライン実行IDも取得されます。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName次のスクリプトを実行し、パイプラインの実行の状態を、完了するまで継続的にチェックします。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"サンプル実行の出力結果を次に示します。
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"adftutorial コンテナーの
outputfilesフォルダーにsparkというフォルダーが作成され、Spark プログラムの出力が含まれていることを確認します。
関連するコンテンツ
このサンプルのパイプラインは、Azure BLOB ストレージ内のある場所から別の場所にデータをコピーします。 以下の方法を学習しました。
- データ ファクトリを作成します。
- リンクされたサービスを作成してデプロイします。
- パイプラインを作成してデプロイします。
- パイプラインの実行を開始します。
- パイプラインの実行を監視します。
次のチュートリアルに進み、仮想ネットワーク内のAzure HDInsight クラスターで Hive スクリプトを実行してデータを変換する方法を学習します。
Tutorial: Azure Virtual Network で Hive を使用してデータを変換します。