次の方法で共有


Amazon S3 から Azure Data Lake Storage Gen2 にデータを移行する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

テンプレートを使用して、数億個のファイルで構成されるペタバイト単位のデータを Amazon S3 からAzure Data Lake Storage Gen2に移行します。

注意

AWS S3 からAzureに小さなデータボリュームをコピーする場合 (たとえば、10 TB 未満)、Azure Data Factoryデータのコピー ツールを使用する方が効率的で簡単です。 この記事で説明するテンプレートは、必要以上のものになってしまいます。

ソリューション テンプレートについて

データのパーティションは、10 TB を超えるデータを移行する場合に特に推奨されます。 データをパーティション分割するには、"プレフィックス" 設定を利用し、Amazon S3 上のフォルダーとファイルを名前でフィルター処理します。その後、各 ADF コピー ジョブでパーティションを 1 つずつコピーできます。 複数の ADF コピー ジョブを同時に実行して、スループットを向上させることができます。

通常、データの移行には、1 回限りの履歴データ移行と、AWS S3 から Azure への変更の定期的な同期が必要です。 以下に 2 つのテンプレートがあります。1 つのテンプレートは 1 回限りの履歴データ移行を対象とし、もう 1 つのテンプレートでは AWS S3 から Azure への変更の同期について説明します。

テンプレートで Amazon S3 から Azure Data Lake Storage Gen2 に履歴データを移行するには

このテンプレート (テンプレート名: AWS S3 から Azure Data Lake Storage Gen2) は、Azure SQL Databaseの外部制御テーブルにパーティション リストを作成していることを前提としています。 そのため、Lookup アクティビティを使用して外部制御テーブルからパーティション一覧を取得し、各パーティションを反復処理して、各 ADF コピー ジョブでパーティションを 1 つずつコピーするようにします。 コピー ジョブは、完了すると、Stored Procedure アクティビティを使用して、制御テーブル内の各パーティションのコピーの状態を更新します。

このテンプレートには、5 つのアクティビティが含まれています。

  • Lookup は、外部コントロール テーブルからAzure Data Lake Storage Gen2にコピーされていないパーティションを取得します。 テーブル名は s3_partition_control_table で、テーブルからデータを読み込むクエリは "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0" です。
  • ForEach では、Lookup アクティビティからパーティション一覧を取得し、各パーティションを TriggerCopy アクティビティに対して反復処理します。 複数の ADF コピー ジョブを同時に実行するように batchCount を設定できます。 このテンプレートでは、2 が設定されています。
  • ExecutePipeline では、CopyFolderPartitionFromS3 パイプラインを実行します。 別のパイプラインを作成して各コピー ジョブでパーティションをコピーする理由は、失敗したコピー ジョブを再実行して、その特定のパーティションを AWS S3 から再度読み込むことが簡単になるためです。 他のパーティションを読み込む他のすべてのコピー ジョブには影響しません。
  • Copy は AWS S3 から Azure Data Lake Storage Gen2 に各パーティションをコピーします。
  • SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態が更新されます。

このテンプレートには、次の 2 つのパラメーターが含まれています。

  • AWS_S3_bucketName は、データの移行元の AWS S3 上にあるバケット名です。 AWS S3 上の複数のバケットからデータを移行する場合は、外部制御テーブルに 1 つ以上の列を追加して、パーティションごとにバケット名を格納することができます。また、その各列からデータを取得するように、パイプラインを更新できます。
  • Azure_Storage_fileSystem は、データの移行先Azure Data Lake Storage Gen2の fileSystem 名です。

テンプレートが変更されたファイルを Amazon S3 から Azure Data Lake Storage Gen2 にのみコピーする場合

このテンプレート (テンプレート名: AWS S3 から Azure Data Lake Storage Gen2) は、各ファイルの LastModifiedTime を使用して、AWS S3 から Azure にのみ新規または更新されたファイルをコピーします。 ファイルまたはフォルダーが既に AWS S3 上のファイルまたはフォルダーの名前の一部であるタイムスライス情報 (例: /yyyy/mm/dd/file.csv) によって時間でパーティション分割されている場合は、このチュートリアルに移動して、新しいファイルを増分読み込みするための高パフォーマンスなアプローチを採用することができます。 このテンプレートは、Azure SQL Databaseの外部制御テーブルにパーティション リストを記述していることを前提としています。 そのため、Lookup アクティビティを使用して外部制御テーブルからパーティション一覧を取得し、各パーティションを反復処理して、各 ADF コピー ジョブでパーティションを 1 つずつコピーするようにします。 各コピー ジョブは、AWS S3 からのファイルのコピーを開始すると、LastModifiedTime プロパティに基づいて、新規または更新済みのファイルのみを識別してコピーします。 コピー ジョブは、完了すると、Stored Procedure アクティビティを使用して、制御テーブル内の各パーティションのコピーの状態を更新します。

このテンプレートには、次の 7 つのアクティビティが含まれています。

  • Lookup では、パーティションを外部制御テーブルから取得します。 テーブル名は s3_partition_delta_control_table で、テーブルからデータを読み込むクエリは "select distinct PartitionPrefix from s3_partition_delta_control_table" です。
  • ForEach では、パーティション一覧を Lookup アクティビティから取得し、各パーティションが TriggerDeltaCopy アクティビティに対して反復処理します。 複数の ADF コピー ジョブを同時に実行するように batchCount を設定できます。 このテンプレートでは、2 が設定されています。
  • ExecutePipeline では、DeltaCopyFolderPartitionFromS3 パイプラインを実行します。 別のパイプラインを作成して各コピー ジョブでパーティションをコピーする理由は、失敗したコピー ジョブを再実行して、その特定のパーティションを AWS S3 から再度読み込むことが簡単になるためです。 他のパーティションを読み込む他のすべてのコピー ジョブには影響しません。
  • Lookup では、最後のコピー ジョブの実行日時が外部制御テーブルから取得されます。そのため、新しいファイルまたは更新されたファイルは、LastModifiedTime を通じて特定できます。 テーブル名は s3_partition_delta_control_table で、テーブルからデータを読み込むクエリは "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1" です。
  • Copy は、AWS S3 から Azure Data Lake Storage Gen2 に各パーティションの新規または変更されたファイルのみをコピーします。 modifiedDatetimeStart のプロパティは、最後のコピー ジョブの実行日時に設定されます。 modifiedDatetimeEnd のプロパティは、現在のコピー ジョブの実行日時に設定されます。 時間には UTC タイム ゾーンが適用されることに注意してください。
  • SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態と、コピーが成功した場合の実行日時が更新されます。 SuccessOrFailure の列は、1 に設定されます。
  • SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態と、コピーが失敗した場合の実行日時が更新されます。 SuccessOrFailure の列は、0 に設定されます。

このテンプレートには、次の 2 つのパラメーターが含まれています。

  • AWS_S3_bucketName は、データの移行元の AWS S3 上にあるバケット名です。 AWS S3 上の複数のバケットからデータを移行する場合は、外部制御テーブルに 1 つ以上の列を追加して、パーティションごとにバケット名を格納することができます。また、その各列からデータを取得するように、パイプラインを更新できます。
  • Azure_Storage_fileSystem は、データの移行先Azure Data Lake Storage Gen2の fileSystem 名です。

この 2 つのソリューション テンプレートの使用方法

テンプレートで Amazon S3 から Azure Data Lake Storage Gen2 に履歴データを移行するには

  1. AZURE SQL DATABASEに制御テーブルを作成して、AWS S3 のパーティションリストを格納します。

    注意

    テーブル名は、s3_partition_control_table です。 制御テーブルのスキーマは PartitionPrefix と SuccessOrFailure です。PartitionPrefix は、名前で Amazon S3 内のフォルダーとファイルをフィルター処理するための S3 のプレフィックス設定であり、SuccessOrFailure は各パーティションのコピーの状態です。0 は、このパーティションがAzureにコピーされていないことを意味し、1 は、このパーティションが正常にAzureにコピーされたことを意味します。 制御テーブルには 5 つのパーティションが定義されており、各パーティションのコピーの既定の状態は 0 です。

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. コントロール テーブルの同じAzure SQL Databaseにストアド プロシージャを作成します。

    注意

    ストアド プロシージャの名前は、sp_update_partition_success です。 これは、ADF パイプラインの SqlServerStoredProcedure アクティビティによって呼び出されます。

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. AWS S3 の履歴データを Azure Data Lake Storage Gen2 テンプレートに移動します。 外部制御テーブルへの接続を入力します。AWS S3 はデータ ソース ストアとして、Azure Data Lake Storage Gen2は宛先ストアとして入力します。 外部制御テーブルとストアド プロシージャは、同じ接続を参照していることに注意してください。

     AWS S3 から Azure Data Lake Storage Gen2 テンプレートに履歴データを移行することを示すスクリーンショット。

  4. [このテンプレートを使用] を選択します。

    [このテンプレートを使用] ボタンが強調表示されているスクリーンショット。

  5. 次の例に示すように、2 つのパイプラインと 3 つのデータセットが作成されたことがわかります。

    テンプレートを使用して作成された 2 つのパイプラインと 3 つのデータセットを示すスクリーンショット。

  6. "BulkCopyFromS3" パイプラインに移動し、 [デバッグ] を選択し、 [パラメーター] を入力します。 次に、 [Finish]\(完了\) を選択します。

    [完了] を選択する前に、[デバッグ] を選択し、パラメーターを入力する場所を示すスクリーンショット。

  7. 次の例のような結果が表示されます。

    返された結果を示すスクリーンショット。

テンプレートが変更されたファイルを Amazon S3 から Azure Data Lake Storage Gen2 にのみコピーする場合

  1. AZURE SQL DATABASEに制御テーブルを作成して、AWS S3 のパーティションリストを格納します。

    注意

    テーブル名は、s3_partition_delta_control_table です。 制御テーブルのスキーマは PartitionPrefix、JobRunTime、SuccessOrFailure です。PartitionPrefix は、名前で Amazon S3 のフォルダーとファイルをフィルター処理するための S3 のプレフィックス設定です。JobRunTime はコピー ジョブの実行時の日時値、SuccessOrFailure は各パーティションのコピーの状態です。0 は、このパーティションがAzureにコピーされていないことを意味し、1 は、このパーティションが正常にAzureにコピーされたことを意味します。 制御テーブルでは、5 つのパーティションが定義されています。 JobRunTime の既定値は、1 回限りの履歴データの移行が開始される時刻にすることができます。 ADF コピー アクティビティでは、その時刻以降に変更された AWS S3 上のファイルがコピーされます。 各パーティションのコピーの既定の状態は 1 です。

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. コントロール テーブルの同じAzure SQL Databaseにストアド プロシージャを作成します。

    注意

    このストアド プロシージャの名前は、sp_insert_partition_JobRunTime_success です。 これは、ADF パイプラインの SqlServerStoredProcedure アクティビティによって呼び出されます。

    CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
    AS
    BEGIN
        INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
        VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
    END
    GO
    
  3. Copy delta data from AWS S3 to Azure Data Lake Storage Gen2 (AWS S3 から Azure Data Lake Storage Gen2 への差分データのコピー) テンプレートに移動します。 外部制御テーブルへの接続を入力します。AWS S3 はデータ ソース ストアとして、Azure Data Lake Storage Gen2は宛先ストアとして入力します。 外部制御テーブルとストアド プロシージャは、同じ接続を参照していることに注意してください。

    新しい接続を作成する

  4. [このテンプレートを使用] を選択します。

    このテンプレートを使用

  5. 次の例に示すように、2 つのパイプラインと 3 つのデータセットが作成されたことがわかります。

    パイプラインのレビュー

  6. "DeltaCopyFromS3" パイプラインに移動し、 [デバッグ] を選択し、 [パラメーター] を入力します。 次に、 [Finish]\(完了\) を選択します。

    **[デバッグ]** をクリックします。

  7. 次の例のような結果が表示されます。

    結果を確認する

  8. クエリ "select * from s3_partition_delta_control_table" による制御テーブルからの結果を確認することもできます。この出力は、次の例のようになります。

    クエリの実行後に制御テーブルからの結果を示すスクリーンショット。