次の方法で共有


PowerShell を使用して Azure SQL Database から Azure Blob Storage にデータを増分読み込みする

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

このチュートリアルでは、Azure Data Factory を使用して、Azure SQL Database のテーブルからデルタデータを読み込み、Azure Blob Storage にロードするパイプラインを作成します。

このチュートリアルでは、以下の手順を実行します。

  • 基準値を格納するためのデータ ストアを準備します。
  • データ ファクトリを作成します。
  • リンクされたサービスを作成します。
  • ソース データセット、シンク データセット、および基準値データセットを作成します。
  • パイプラインを作成します。
  • パイプラインを実行します。
  • パイプラインの実行を監視します。

概要

ソリューションの概略図を次に示します。

インクリメンタルデータ読み込み

このソリューションを作成するための重要な手順を次に示します。

  1. 基準値列を選択する。 ソース データ ストアのいずれか 1 つの列を選択します。実行ごとに新しいレコードまたは更新されたレコードを切り分ける目的で使用されます。 通常、行が作成または更新されると、この選択された列のデータ(例えば、last_modify_time や ID)は増加し続けます。 この列の最大値が基準値として使用されます。

  2. 基準値を格納するためのデータ ストアを準備する
    このチュートリアルでは、SQL データベースに基準値を格納します。

  3. 次のワークフローを含んだパイプラインを作成する

    このソリューションのパイプラインには、次のアクティビティがあります。

    • 2 つのルックアップ アクティビティを作成する。 1 つ目のルックアップ アクティビティを使用して、最終のウォーターマーク値を取得します。 2 つ目のルックアップ アクティビティは、新しい基準値を取得するために使用します。 これらの基準値は、コピー アクティビティに渡されます。
    • 基準値列の値が古い基準値より大きく、新しい基準値以下の値を持つ行をソース データ ストアからコピーするCopy アクティビティを作成します。 その差分データが、ソース データ ストアから新しいファイルとして BLOB ストレージにコピーされます。
    • ストアド プロシージャ― アクティビティを作成する。これはパイプラインの次回実行に備えて基準値を更新するためのアクティビティです。

Azure サブスクリプションをお持ちでない場合は、開始する前に free アカウントを作成してください。

前提条件

注釈

Azure Az PowerShell モジュールを使用してAzureを操作することをお勧めします。 作業を開始するには、「Install Azure PowerShellを参照してください。 Az PowerShell モジュールに移行する方法については、「AzRM から Azを参照してください。

  • Azure SQL Database。 ソース データ ストアとして使うデータベースです。 Azure SQL Databaseにデータベースがない場合は、Azure SQL Database でデータベースを作成するの手順を参照してください。
  • Azure Storage。 あなたは BLOB ストレージをシンク データ ストアとして使用します。 Azure ストレージ アカウントがない場合の作成手順については、「ストレージ アカウントの作成」を参照してください。 adftutorial という名前のコンテナーを作成します。
  • Azure PowerShellの指示に従ってAzure PowerShellをインストールして構成します。

SQL データベースにデータ ソース テーブルを作成する

  1. SQL Server Management Studioを開きます。 サーバー エクスプローラーで目的のデータベースを右クリックし、 [新しいクエリ] を選択します。

  2. SQL データベースに対して次の SQL コマンドを実行し、data_source_table という名前のテーブルをデータ ソース ストアとして作成します。

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    このチュートリアルでは、LastModifytime を基準値列として使用します。 データ ソース ストアに格納されているデータを次の表に示します。

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

高基準値の格納用としてもう 1 つテーブルを SQL データベースに作成する

  1. SQL データベースに対して次の SQL コマンドを実行し、基準値の格納先として watermarktable という名前のテーブルを作成します。

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. ソース データ ストアのテーブル名と組み合わせて高基準値の既定値を設定します。 このチュートリアルでは、テーブル名は data_source_table です。

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. watermarktable テーブル内のデータを確認します。

    Select * from watermarktable
    

    出力:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

SQL データベースにストアド プロシージャを作成する

次のコマンドを実行して、SQL データベースにストアド プロシージャを作成します。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Data Factory の作成

  1. 後で PowerShell コマンドで使用できるように、リソース グループ名の変数を定義します。 次のコマンド テキストを PowerShell にコピーし、Azure リソース グループの名前を二重引用符で囲んで指定し、コマンドを実行します。 たとえば "adfrg" です。

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    リソース グループが既に存在する場合は、上書きされないようにすることができます。 $resourceGroupName 変数に別の値を割り当てて、コマンドをもう一度実行します。

  2. データ ファクトリの場所の変数を定義します。

    $location = "East US"
    
  3. Azure リソース グループを作成するには、次のコマンドを実行します。

    New-AzResourceGroup $resourceGroupName $location
    

    リソース グループが既に存在する場合は、上書きされないようにすることができます。 $resourceGroupName 変数に別の値を割り当てて、コマンドをもう一度実行します。

  4. データ ファクトリ名の変数を定義します。

    重要

    データ ファクトリ名は、グローバルに一意になるように更新してください。 たとえば、ADFTutorialFactorySP1127 とします。

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. データ ファクトリを作成するには、次の Set-AzDataFactoryV2 コマンドレットを実行します。

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

以下の点に注意してください。

  • データ ファクトリの名前はグローバルに一意にする必要があります。 次のエラーが発生した場合は、名前を変更してからもう一度実行してください。

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Data Factory インスタンスを作成するには、Azureへのサインインに使用するユーザー アカウントが、共同作成者ロールまたは所有者ロールのメンバーであるか、Azure サブスクリプションの管理者である必要があります。

  • Data Factory が現在使用できるAzureリージョンの一覧については、次のページで目的のリージョンを選択し、Analytics を展開して、Data Factory: Products available by region> を探します。 データ ファクトリで使用されるデータ ストア (ストレージ、SQL Database、Azure SQL Managed Instanceなど) とコンピューティング (Azure HDInsightなど) は、他のリージョンに置くことができます。

リンクされたサービスを作成します

データ ストアおよびコンピューティング サービスをデータ ファクトリにリンクするには、リンクされたサービスをデータ ファクトリに作成します。 このセクションでは、ストレージ アカウントと SQL Database に対するリンクされたサービスを作成します。

ストレージのリンクされたサービスを作成するには

  1. 次の内容を記述した AzureStorageLinkedService.json という名前の JSON ファイルを C:\ADF フォルダーに作成します (ADF フォルダーが存在しない場合は作成してください)。<accountName><accountKey> を、使用するストレージ アカウントの名前とキーに置き換えてからファイルを保存してください。

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. PowerShell で ADF フォルダーに切り替えます。

  3. Set-AzDataFactoryV2LinkedService コマンドレットを実行して、リンクされたサービス AzureStorageLinkedService を作成します。 次の例では、ResourceGroupName パラメーターと DataFactoryName パラメーターの値を渡しています。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    出力例を次に示します。

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

SQL Database のリンクされたサービスを作成する

  1. 次の内容を記述した AzureSQLDatabaseLinkedService.json という名前の JSON ファイルを C:\ADF フォルダーに作成します (ADF フォルダーが存在しない場合は作成してください)。ファイルを保存する前に、<your-server-name> と <your-database-name> をサーバーとデータベースの名前に置き換えます。 また、データ ファクトリのマネージド IDへのアクセスを許可するように Azure SQL Serverを構成する必要もあります。

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. PowerShell で ADF フォルダーに切り替えます。

  3. Set-AzDataFactoryV2LinkedService コマンドレットを実行して、リンクされたサービス AzureSQLDatabaseLinkedService を作成します。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    出力例を次に示します。

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

データセットを作成する

この手順では、ソース データとシンク データを表すデータセットを作成します。

ソース データセットを作成する

  1. 以下の内容を記述した SourceDataset.json という名前の JSON ファイルを同じフォルダー内に作成します。

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    このチュートリアルでは、data_source_table というテーブル名を使用します。 別の名前のテーブルを使用する場合は、テーブル名を置き換えてください。

  2. Set-AzDataFactoryV2Dataset コマンドレットを実行して、データセット SourceDataset を作成します。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    このコマンドレットの出力例を次に示します。

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

シンク データセットを作成する

  1. 以下の内容を記述した SinkDataset.json という名前の JSON ファイルを同じフォルダー内に作成します。

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    重要

    このスニペットは、BLOB ストレージに adftutorial という名前の BLOB コンテナーがあることを前提としています。 このコンテナーが存在しない場合は作成するか、既存のコンテナーの名前を設定してください。 出力フォルダー incrementalcopy は、コンテナーに存在しない場合は自動的に作成されます。 このチュートリアルでは、@CONCAT('Incremental-', pipeline().RunId, '.txt') という式を使ってファイル名が動的に生成されます。

  2. Set-AzDataFactoryV2Dataset コマンドレットを実行して、データセット SinkDataset を作成します。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    このコマンドレットの出力例を次に示します。

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

基準値用のデータセットを作成する

この手順では、高基準値を格納するためのデータセットを作成します。

  1. 以下の内容を記述した WatermarkDataset.json という名前の JSON ファイルを同じフォルダー内に作成します。

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Set-AzDataFactoryV2Dataset コマンドレットを実行して、データセット WatermarkDataset を作成します。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    このコマンドレットの出力例を次に示します。

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

パイプラインを作成する

このチュートリアルでは、2 つのルックアップ アクティビティ、1 つのCopy アクティビティ、1 つの StoredProcedure アクティビティが 1 つのパイプラインにチェーンされたパイプラインを作成します。

  1. 以下の内容を記述した IncrementalCopyPipeline.json という名前の JSON ファイルを同じフォルダー内に作成します。

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Set-AzDataFactoryV2Pipeline コマンドレットを実行して、パイプライン IncrementalCopyPipeline を作成します。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    出力例を次に示します。

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

パイプラインを実行する

  1. Invoke-AzDataFactoryV2Pipeline コマンドレットを使って IncrementalCopyPipeline パイプラインを実行します。 プレースホルダーを実際のリソースグループとデータファクトリ名に置き換えてください。

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Get-AzDataFactoryV2ActivityRun コマンドレットを実行し、すべてのアクティビティが正常に実行されている状態になるまでパイプラインの状態をチェックします。 RunStartedAfter パラメーターと RunStartedBefore パラメーターのプレースホルダーを、適切な時刻に置き換えてください。 このチュートリアルでは、 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15" を使用します。

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    出力例を次に示します。

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

結果の確認

  1. BLOB ストレージ (シンク ストア) で、SinkDataset で定義したファイルにデータがコピーされたことを確認します。 このチュートリアルでは、Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt がそのファイル名に該当します。 このファイルを開くと、SQL データベース内のデータと同じレコードがあることを確認できます。

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. watermarktable の最新の値をチェックします。 基準値が更新されたことを確認できます。

    Select * from watermarktable
    

    出力例を次に示します。

    TableName ウォーターマーク値
    データソーステーブル 2017-09-05 8:06:00.000

データ ソース ストアにデータを挿入して差分データの読み込みを検証する

  1. SQL データベース (データ ソース ストア) に新しいデータを挿入します。

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    SQL データベース内の更新されたデータは次のとおりです。

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Invoke-AzDataFactoryV2Pipeline コマンドレットを使って IncrementalCopyPipeline パイプラインを再実行します。 プレースホルダーを実際のリソースグループとデータファクトリ名に置き換えてください。

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Get-AzDataFactoryV2ActivityRun コマンドレットを実行し、すべてのアクティビティが正常に実行されている状態になるまでパイプラインの状態をチェックします。 RunStartedAfter パラメーターと RunStartedBefore パラメーターのプレースホルダーを、適切な時刻に置き換えてください。 このチュートリアルでは、 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15" を使用します。

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    出力例を次に示します。

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. BLOB ストレージで、別のファイルが作成されたことを確認します。 このチュートリアルでは、Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt が新しいファイルの名前になります。 このファイルを開くと、2 行のレコードが確認できます。

  5. watermarktable の最新の値をチェックします。 基準値が再び更新されたことを確認できます。

    Select * from watermarktable
    

    サンプル出力:

    TableName ウォーターマーク値
    データソーステーブル 2017-09-07 09:01:00.000

このチュートリアルでは、以下の手順を実行しました。

  • 基準値を格納するためのデータ ストアを準備します。
  • データ ファクトリを作成します。
  • リンクされたサービスを作成します。
  • ソース データセット、シンク データセット、および基準値データセットを作成します。
  • パイプラインを作成します。
  • パイプラインを実行します。
  • パイプラインの実行を監視します。

このチュートリアルでは、パイプラインによって、Azure SQL Database内の 1 つのテーブルから BLOB ストレージにデータがコピーされました。 次のチュートリアルに進み、SQL Server データベース内の複数のテーブルから SQL Database にデータをコピーする方法について説明します。