次の方法で共有


Azure Data Factory の Spark アクティビティを使用してクラウド内のデータを変換する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

このチュートリアルでは、Azure PowerShellを使用して、Spark アクティビティとオンデマンド HDInsight のリンクされたサービスを使用してデータを変換する Data Factory パイプラインを作成します。 このチュートリアルでは、以下の手順を実行します。

  • データ ファクトリを作成します。
  • リンクされたサービスを作成してデプロイします。
  • パイプラインを作成してデプロイします。
  • パイプラインの実行を開始します。
  • パイプラインの実行を監視します。

Azure サブスクリプションをお持ちでない場合は、開始する前に free アカウントを作成してください。

前提条件

注釈

Azure Az PowerShell モジュールを使用してAzureを操作することをお勧めします。 作業を開始するには、「Install Azure PowerShellを参照してください。 Az PowerShell モジュールに移行する方法については、「AzRM から Azを参照してください。

  • Azure Storage アカウント。 Python スクリプトと入力ファイルを作成し、Azure ストレージにアップロードします。 Spark プログラムからの出力は、このストレージ アカウントに格納されます。 オンデマンドの Spark クラスターでは、同じストレージ アカウントがプライマリ ストレージとして使用されます。
  • Azure PowerShellAzure PowerShellの手順に従います。

Blob Storage アカウントPythonスクリプトをアップロードする

  1. 次の内容を含む WordCount_Spark.py という名前のPython ファイルを作成します。

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> をAzure Storage アカウントの名前に置き換えます。 その後、ファイルを保存します。

  3. Azure Blob Storageで、adftutorial という名前のコンテナーが存在しない場合は作成します。

  4. spark という名前のフォルダーを作成します。

  5. spark フォルダーの下に、script という名前のサブフォルダーを作成します。

  6. WordCount_Spark.py ファイルを script サブフォルダーにアップロードします。

入力ファイルをアップロードする

  1. minecraftstory.txt という名前のファイルを作成し、任意のテキストを入力しておきます。 このテキストの単語数が Spark プログラムによってカウントされます。
  2. inputfiles フォルダーに、spark という名前のサブフォルダーを作成します。
  3. minecraftstory.txtinputfiles サブフォルダーにアップロードします。

著者によってリンクされたサービス

このセクションでは、次の 2 つのリンクされたサービスを作成します。

  • Azure Storage アカウントをデータファクトリに接続する Azure Storage のリンクされたサービス。 このストレージは、オンデマンドの HDInsight クラスターによって使用されます。 ここには、実行される Spark スクリプトも含まれています。
  • オンデマンドの HDInsight のリンクされたサービス。 Azure Data Factoryは、HDInsight クラスターを自動的に作成し、Spark プログラムを実行してから、事前構成された時間アイドル状態の後に HDInsight クラスターを削除します。

リンクされたサービスAzure Storage

任意のエディターを使用して JSON ファイルを作成し、Azure Storageリンクされたサービスの次の JSON 定義をコピーし、ファイルを MyStorageLinkedService.jsonとして保存します。

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

<storageAccountName> および <storageAccountKey> を、Azure Storage アカウントの名前とキーで更新します。

オンデマンドの HDInsight 連携サービス

任意のエディターを使用して JSON ファイルを作成し、Azure HDInsightリンクされたサービスの次の JSON 定義をコピーし、ファイルを MyOnDemandSparkLinkedService.json として保存します。

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

リンクされたサービスの定義で、以下のプロパティの値を更新します。

  • hostSubscriptionId。 <subscriptionID> を、Azure サブスクリプションの ID に置き換えます。 オンデマンドの HDInsight クラスターは、このサブスクリプション内に作成されます。
  • テナント。 <tenantID> をAzure テナントの ID に置き換えます。
  • servicePrincipalIdservicePrincipalKey。 <servicePrincipalID> および <servicePrincipalKey> を、Microsoft Entra IDのサービス プリンシパルの ID とキーに置き換えます。 このサービス プリンシパルは、サブスクリプションまたはクラスターが作成されるリソース グループの共同作成者ロールのメンバーである必要があります。 詳細については、アプリケーションとサービス プリンシパル Microsoft Entraの作成を参照してください。 [サービス プリンシパル ID] は "アプリケーション ID" に、 [サービス プリンシパル キー] は "クライアント シークレット" の値に相当します。
  • clusterResourceGroup。 <resourceGroupOfHDICluster> は、HDInsight クラスターの作成先であるリソース グループの名前に置き換えてください。

注釈

Azure HDInsightでは、サポートされている各Azureリージョンで使用できるコアの合計数に制限があります。 オンデマンド HDInsight のリンクされたサービスの場合、HDInsight クラスターは、プライマリ ストレージとして使用されるAzure Storageの同じ場所に作成されます。 クラスターを正常に作成するための十分なコア クォータがあることを確認します。 詳細については、「Hadoop、Spark、Kafka などの HDInsight クラスターをセットアップする」を参照してください。

パイプラインを作成する

この手順では、Spark アクティビティがある新しいパイプラインを作成します。 アクティビティでは word count サンプルが使用されます。 まだコンテンツをダウンロードしていない場合は、この場所からダウンロードします。

任意のエディターで JSON ファイルを作成し、パイプライン定義から次の JSON 定義をコピーして、MySparkOnDemandPipeline.json として保存します。

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

以下の点に注意してください。

  • rootPath は、adftutorial コンテナーの spark フォルダーを指します。
  • entryFilePath は、spark フォルダーの script サブフォルダーの WordCount_Spark.py ファイルを指します。

Data Factory の作成

JSON ファイルで、リンクされたサービスとパイプライン定義を作成しました。 次に、PowerShell コマンドレットを使用して、データ ファクトリを作成し、リンクされたサービスとパイプライン JSON ファイルをデプロイしましょう。 以下の PowerShell コマンドを 1 つずつ実行します。

  1. 変数を 1 つずつ設定します。

    リソース グループ名

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    データ ファクトリ名。 グローバルに一意であること

    $dataFactoryName = "MyDataFactory09102017"
    

    パイプライン名

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. PowerShellを起動します。 このクイック スタートが終わるまで、Azure PowerShellを開いたままにしておいてください。 Azure PowerShell を閉じて再度開いた場合は、これらのコマンドをもう一度実行する必要があります。 Data Factory が現在使用できるAzureリージョンの一覧については、次のページで目的のリージョンを選択し、Analytics を展開して、Data Factory: Products available by region> を探します。 データ ファクトリで使用されるデータ ストア (Azure Storage、Azure SQL Databaseなど) とコンピューティング (HDInsight など) は、他のリージョンに存在できます。

    次のコマンドを実行し、Azure ポータルへのサインインに使用するユーザー名とパスワードを入力します。

    Connect-AzAccount
    

    次のコマンドを実行して、このアカウントのすべてのサブスクリプションを表示します。

    Get-AzSubscription
    

    次のコマンドを実行して、使用するサブスクリプションを選択します。 SubscriptionId をAzure サブスクリプションの ID に置き換えます。

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. リソース グループを作成します。ADFTutorialResourceGroup。

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. データ ファクトリを作成します。

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    次のコマンドを実行して、出力を表示します。

    $df
    
  5. JSON ファイルを作成したフォルダーに切り替え、次のコマンドを実行して、Azure Storageリンクされたサービスをデプロイします。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 次のコマンドを実行し、オンデマンドの Spark のリンクされたサービスをデプロイします。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 次のコマンドを実行し、パイプラインをデプロイします。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

パイプラインの実行を開始して監視する

  1. パイプラインの実行を開始します。 将来の監視のために、パイプライン実行IDも取得されます。

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 次のスクリプトを実行し、パイプラインの実行の状態を、完了するまで継続的にチェックします。

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. サンプル実行の出力結果を次に示します。

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. adftutorial コンテナーの outputfiles フォルダーに spark というフォルダーが作成され、Spark プログラムの出力が含まれていることを確認します。

このサンプルのパイプラインは、Azure BLOB ストレージ内のある場所から別の場所にデータをコピーします。 以下の方法を学習しました。

  • データ ファクトリを作成します。
  • リンクされたサービスを作成してデプロイします。
  • パイプラインを作成してデプロイします。
  • パイプラインの実行を開始します。
  • パイプラインの実行を監視します。

次のチュートリアルに進み、仮想ネットワーク内のAzure HDInsight クラスターで Hive スクリプトを実行してデータを変換する方法を学習します。

Tutorial: Azure Virtual Network で Hive を使用してデータを変換します。