適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。
テンプレートを使用して、数億個のファイルで構成されるペタバイト単位のデータを Amazon S3 からAzure Data Lake Storage Gen2に移行します。
注意
AWS S3 からAzureに小さなデータボリュームをコピーする場合 (たとえば、10 TB 未満)、Azure Data Factoryデータのコピー ツールを使用する方が効率的で簡単です。 この記事で説明するテンプレートは、必要以上のものになってしまいます。
ソリューション テンプレートについて
データのパーティションは、10 TB を超えるデータを移行する場合に特に推奨されます。 データをパーティション分割するには、"プレフィックス" 設定を利用し、Amazon S3 上のフォルダーとファイルを名前でフィルター処理します。その後、各 ADF コピー ジョブでパーティションを 1 つずつコピーできます。 複数の ADF コピー ジョブを同時に実行して、スループットを向上させることができます。
通常、データの移行には、1 回限りの履歴データ移行と、AWS S3 から Azure への変更の定期的な同期が必要です。 以下に 2 つのテンプレートがあります。1 つのテンプレートは 1 回限りの履歴データ移行を対象とし、もう 1 つのテンプレートでは AWS S3 から Azure への変更の同期について説明します。
テンプレートで Amazon S3 から Azure Data Lake Storage Gen2 に履歴データを移行するには
このテンプレート (
このテンプレートには、5 つのアクティビティが含まれています。
- Lookup は、外部コントロール テーブルからAzure Data Lake Storage Gen2にコピーされていないパーティションを取得します。 テーブル名は s3_partition_control_table で、テーブルからデータを読み込むクエリは "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0" です。
- ForEach では、Lookup アクティビティからパーティション一覧を取得し、各パーティションを TriggerCopy アクティビティに対して反復処理します。 複数の ADF コピー ジョブを同時に実行するように batchCount を設定できます。 このテンプレートでは、2 が設定されています。
- ExecutePipeline では、CopyFolderPartitionFromS3 パイプラインを実行します。 別のパイプラインを作成して各コピー ジョブでパーティションをコピーする理由は、失敗したコピー ジョブを再実行して、その特定のパーティションを AWS S3 から再度読み込むことが簡単になるためです。 他のパーティションを読み込む他のすべてのコピー ジョブには影響しません。
- Copy は AWS S3 から Azure Data Lake Storage Gen2 に各パーティションをコピーします。
- SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態が更新されます。
このテンプレートには、次の 2 つのパラメーターが含まれています。
- AWS_S3_bucketName は、データの移行元の AWS S3 上にあるバケット名です。 AWS S3 上の複数のバケットからデータを移行する場合は、外部制御テーブルに 1 つ以上の列を追加して、パーティションごとにバケット名を格納することができます。また、その各列からデータを取得するように、パイプラインを更新できます。
- Azure_Storage_fileSystem は、データの移行先Azure Data Lake Storage Gen2の fileSystem 名です。
テンプレートが変更されたファイルを Amazon S3 から Azure Data Lake Storage Gen2 にのみコピーする場合
このテンプレート (
このテンプレートには、次の 7 つのアクティビティが含まれています。
- Lookup では、パーティションを外部制御テーブルから取得します。 テーブル名は s3_partition_delta_control_table で、テーブルからデータを読み込むクエリは "select distinct PartitionPrefix from s3_partition_delta_control_table" です。
- ForEach では、パーティション一覧を Lookup アクティビティから取得し、各パーティションが TriggerDeltaCopy アクティビティに対して反復処理します。 複数の ADF コピー ジョブを同時に実行するように batchCount を設定できます。 このテンプレートでは、2 が設定されています。
- ExecutePipeline では、DeltaCopyFolderPartitionFromS3 パイプラインを実行します。 別のパイプラインを作成して各コピー ジョブでパーティションをコピーする理由は、失敗したコピー ジョブを再実行して、その特定のパーティションを AWS S3 から再度読み込むことが簡単になるためです。 他のパーティションを読み込む他のすべてのコピー ジョブには影響しません。
- Lookup では、最後のコピー ジョブの実行日時が外部制御テーブルから取得されます。そのため、新しいファイルまたは更新されたファイルは、LastModifiedTime を通じて特定できます。 テーブル名は s3_partition_delta_control_table で、テーブルからデータを読み込むクエリは "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1" です。
- Copy は、AWS S3 から Azure Data Lake Storage Gen2 に各パーティションの新規または変更されたファイルのみをコピーします。 modifiedDatetimeStart のプロパティは、最後のコピー ジョブの実行日時に設定されます。 modifiedDatetimeEnd のプロパティは、現在のコピー ジョブの実行日時に設定されます。 時間には UTC タイム ゾーンが適用されることに注意してください。
- SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態と、コピーが成功した場合の実行日時が更新されます。 SuccessOrFailure の列は、1 に設定されます。
- SqlServerStoredProcedure では、制御テーブル内の各パーティションのコピーの状態と、コピーが失敗した場合の実行日時が更新されます。 SuccessOrFailure の列は、0 に設定されます。
このテンプレートには、次の 2 つのパラメーターが含まれています。
- AWS_S3_bucketName は、データの移行元の AWS S3 上にあるバケット名です。 AWS S3 上の複数のバケットからデータを移行する場合は、外部制御テーブルに 1 つ以上の列を追加して、パーティションごとにバケット名を格納することができます。また、その各列からデータを取得するように、パイプラインを更新できます。
- Azure_Storage_fileSystem は、データの移行先Azure Data Lake Storage Gen2の fileSystem 名です。
この 2 つのソリューション テンプレートの使用方法
テンプレートで Amazon S3 から Azure Data Lake Storage Gen2 に履歴データを移行するには
AZURE SQL DATABASEに制御テーブルを作成して、AWS S3 のパーティションリストを格納します。
注意
テーブル名は、s3_partition_control_table です。 制御テーブルのスキーマは PartitionPrefix と SuccessOrFailure です。PartitionPrefix は、名前で Amazon S3 内のフォルダーとファイルをフィルター処理するための S3 のプレフィックス設定であり、SuccessOrFailure は各パーティションのコピーの状態です。0 は、このパーティションがAzureにコピーされていないことを意味し、1 は、このパーティションが正常にAzureにコピーされたことを意味します。 制御テーブルには 5 つのパーティションが定義されており、各パーティションのコピーの既定の状態は 0 です。
CREATE TABLE [dbo].[s3_partition_control_table]( [PartitionPrefix] [varchar](255) NULL, [SuccessOrFailure] [bit] NULL ) INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure) VALUES ('a', 0), ('b', 0), ('c', 0), ('d', 0), ('e', 0);コントロール テーブルの同じAzure SQL Databaseにストアド プロシージャを作成します。
注意
ストアド プロシージャの名前は、sp_update_partition_success です。 これは、ADF パイプラインの SqlServerStoredProcedure アクティビティによって呼び出されます。
CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255) AS BEGIN UPDATE s3_partition_control_table SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix END GOAWS S3 の履歴データを Azure Data Lake Storage Gen2 テンプレートに移動します。 外部制御テーブルへの接続を入力します。AWS S3 はデータ ソース ストアとして、Azure Data Lake Storage Gen2は宛先ストアとして入力します。 外部制御テーブルとストアド プロシージャは、同じ接続を参照していることに注意してください。
[このテンプレートを使用] を選択します。
次の例に示すように、2 つのパイプラインと 3 つのデータセットが作成されたことがわかります。
"BulkCopyFromS3" パイプラインに移動し、 [デバッグ] を選択し、 [パラメーター] を入力します。 次に、 [Finish]\(完了\) を選択します。
次の例のような結果が表示されます。
テンプレートが変更されたファイルを Amazon S3 から Azure Data Lake Storage Gen2 にのみコピーする場合
AZURE SQL DATABASEに制御テーブルを作成して、AWS S3 のパーティションリストを格納します。
注意
テーブル名は、s3_partition_delta_control_table です。 制御テーブルのスキーマは PartitionPrefix、JobRunTime、SuccessOrFailure です。PartitionPrefix は、名前で Amazon S3 のフォルダーとファイルをフィルター処理するための S3 のプレフィックス設定です。JobRunTime はコピー ジョブの実行時の日時値、SuccessOrFailure は各パーティションのコピーの状態です。0 は、このパーティションがAzureにコピーされていないことを意味し、1 は、このパーティションが正常にAzureにコピーされたことを意味します。 制御テーブルでは、5 つのパーティションが定義されています。 JobRunTime の既定値は、1 回限りの履歴データの移行が開始される時刻にすることができます。 ADF コピー アクティビティでは、その時刻以降に変更された AWS S3 上のファイルがコピーされます。 各パーティションのコピーの既定の状態は 1 です。
CREATE TABLE [dbo].[s3_partition_delta_control_table]( [PartitionPrefix] [varchar](255) NULL, [JobRunTime] [datetime] NULL, [SuccessOrFailure] [bit] NULL ) INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure) VALUES ('a','1/1/2019 12:00:00 AM',1), ('b','1/1/2019 12:00:00 AM',1), ('c','1/1/2019 12:00:00 AM',1), ('d','1/1/2019 12:00:00 AM',1), ('e','1/1/2019 12:00:00 AM',1);コントロール テーブルの同じAzure SQL Databaseにストアド プロシージャを作成します。
注意
このストアド プロシージャの名前は、sp_insert_partition_JobRunTime_success です。 これは、ADF パイプラインの SqlServerStoredProcedure アクティビティによって呼び出されます。
CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit AS BEGIN INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure) VALUES (@PartPrefix,@JobRunTime,@SuccessOrFailure) END GOCopy delta data from AWS S3 to Azure Data Lake Storage Gen2 (AWS S3 から Azure Data Lake Storage Gen2 への差分データのコピー) テンプレートに移動します。 外部制御テーブルへの接続を入力します。AWS S3 はデータ ソース ストアとして、Azure Data Lake Storage Gen2は宛先ストアとして入力します。 外部制御テーブルとストアド プロシージャは、同じ接続を参照していることに注意してください。
[このテンプレートを使用] を選択します。
次の例に示すように、2 つのパイプラインと 3 つのデータセットが作成されたことがわかります。
"DeltaCopyFromS3" パイプラインに移動し、 [デバッグ] を選択し、 [パラメーター] を入力します。 次に、 [Finish]\(完了\) を選択します。
次の例のような結果が表示されます。
クエリ "select * from s3_partition_delta_control_table" による制御テーブルからの結果を確認することもできます。この出力は、次の例のようになります。