適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。
このチュートリアルでは、Azure Blob StorageからAzure SQL Databaseにデータをコピーする Data Factory パイプラインを作成します。 このチュートリアルの構成パターンは、ファイルベースのデータ ストアからリレーショナル データ ストアへのコピーに適用されます。 ソースおよびシンクとしてサポートされているデータ ストアの一覧については、「サポートされるデータ ストアと形式」を参照してください。
このチュートリアルでは、次の手順を実行します。
- データ ファクトリを作成します。
- リンクサービスとしてAzure StorageとAzure SQL Databaseを作成します。
- Azure Blob データセットとAzure SQL Database データセットを作成します。
- コピーアクティビティを含むパイプラインを作成します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
このチュートリアルでは、.NET SDK を使用します。 他のメカニズムを使用してAzure Data Factoryを操作できます。Quickstarts のサンプルを参照してください。
Azure サブスクリプションをお持ちでない場合は、開始する前に、free Azure アカウントを作成します。
前提条件
- Azure Storage アカウント。 BLOB ストレージをソース データ ストアとして使用します。 Azure ストレージ アカウントがない場合は、「汎用ストレージ アカウントを作成する」を参照>。
- Azure SQL Database。 データベースをシンク データ ストアとして使用します。 Azure SQL Databaseにデータベースがない場合は、Azure SQL Database でのデータベース作成を参照してください。
- Visual Studio。 この記事のチュートリアルでは、Visual Studio 2019 を使用します。
- Azure SDK for .NET。
- Microsoft Entra application。 Microsoft Entra アプリケーションがない場合は、「 Microsoft Entra アプリケーションの作成」セクションポータルを使用してMicrosoft Entra アプリケーションを作成する方法を参照してください。 以降の手順で使用するために、次の値をコピーします。アプリケーション (クライアント) ID、認証キー、およびディレクトリ (テナント) ID。 同じ記事の手順に従って、アプリケーションを [共同作成者] ロールに割り当てます。
BLOB と SQL テーブルを作成する
次に、ソース BLOB とシンク SQL テーブルを作成して、Azure BLOB とAzure SQL Databaseをチュートリアル用に準備します。
ソース BLOB を作成する
最初に、コンテナーを作成し、そこに入力テキスト ファイルをアップロードして、ソース BLOB を作成します。
メモ帳を開きます。 次のテキストをコピーし、inputEmp.txt というファイル名でローカルに保存します。
John|Doe Jane|DoeAzure Storage Explorer などのツールを使用して、adfv2tutorial コンテナーを作成し、inputEmp.txt ファイルをコンテナーにアップロードします。
シンク SQL テーブルを作成する
次に、シンク SQL テーブルを作成します。
次の SQL スクリプトを使用して、Azure SQL Databaseに dbo.emp テーブルを作成します。
CREATE TABLE dbo.emp ( ID int IDENTITY(1,1) NOT NULL, FirstName varchar(50), LastName varchar(50) ) GO CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);サービスAzure SQL Database へのアクセスを許可します。 Data Factory サービスが SQL Database にデータを書き込むことができるように、サーバー内の Azure サービスへのアクセスを許可していることを確認します。 この設定を確認して有効にするには、次の手順を実行します。
Azure ポータルに移動して、SQL Server を管理します。 [SQL サーバー] を探して選択します。
ご自身のサーバーを選択します。
SQL サーバー メニューの [セキュリティ] の見出しの下にある [ファイアウォールと仮想ネットワーク] を選択します。
ファイアウォールと仮想ネットワーク ページの このサーバーにアクセスするためのAzure サービスとリソースで、ON を選択します。
Visual Studio プロジェクトを作成する
Visual Studioを使用して、C# .NET コンソール アプリケーションを作成します。
- Visual Studioを開きます。
- [開始] ウィンドウで、[新しいプロジェクトの作成] を選択します。
- 新しいプロジェクトの作成 ウィンドウで、プロジェクトの種類の一覧から Console App (.NET Framework) の C# バージョンを選択します。 [次へ]を選択します。
- [新しいプロジェクトの構成] ウィンドウで、[プロジェクト名] に「ADFv2Tutorial」と入力します。 [場所] では、プロジェクトを保存するディレクトリを参照または作成します。 [作成] を選択します。 新しいプロジェクトがVisual Studio IDEに表示されます。
NuGet パッケージのインストール
次に、NuGet パッケージ マネージャーを使用して、必要なライブラリ パッケージをインストールします。
メニュー バーで、Tools>NuGet パッケージ マネージャー>パッケージ マネージャー Console を選択します。
パッケージ マネージャー コンソール ウィンドウで、次のコマンドを実行してパッケージをインストールします。 Azure Data Factory NuGet パッケージの詳細については、「Microsoft.Azure」を参照してください。Management.DataFactory。
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
データ ファクトリ クライアントを作成する
データ ファクトリ クライアントを作成するには、次の手順に従います。
Program.cs を開き、既存の
usingステートメントを次のコードに置き換えて、名前空間への参照を追加します。using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;変数を設定する次のコードを、
Mainメソッドに追加します。 14 のプレースホルダーを実際の値に置き換えます。Data Factory が現在使用できるAzureリージョンの一覧については、「リージョン別の利用可能な製品を参照してください。 [製品] ドロップダウン リストの下で、[参照]>[分析]>[Data Factory] の順に選択します。 次に、[リージョン] ドロップダウン リストで、目的のリージョンを選択します。 選択したリージョンでの Data Factory 製品の利用可否の状態が、グリッドに表示されます。
Note
Data Factory で使用されるデータ ストア (Azure StorageやAzure SQL Databaseなど) とコンピューティング (HDInsight など) は、Data Factory 用に選択したリージョン以外のリージョンに配置できます。
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID to create the factory>"; string resourceGroup = "<your resource group to create the factory>"; string region = "<location to create the data factory in, such as East US>"; string dataFactoryName = "<name of data factory to create (must be globally unique)>"; // Specify the source Azure Blob information string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; string inputBlobPath = "adfv2tutorial/"; string inputBlobName = "inputEmp.txt"; // Specify the sink Azure SQL Database information string azureSqlConnString = "Server=tcp:<your server name>.database.windows.net,1433;" + "Database=<your database name>;" + "User ID=<your username>@<your server name>;" + "Password=<your password>;" + "Trusted_Connection=False;Encrypt=True;Connection Timeout=30"; string azureSqlTableName = "dbo.emp"; string storageLinkedServiceName = "AzureStorageLinkedService"; string sqlDbLinkedServiceName = "AzureSqlDbLinkedService"; string blobDatasetName = "BlobDataset"; string sqlDatasetName = "SqlDataset"; string pipelineName = "Adfv2TutorialBlobToSqlCopy";Mainクラスのインスタンスを作成する次のコードを、DataFactoryManagementClientメソッドに追加します。 このオブジェクトを使用して、データ ファクトリ、リンクされたサービス、データセット、パイプラインを作成します。 また、このオブジェクトを使用して、パイプラインの実行の詳細を監視します。// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync( "https://management.azure.com/", cc ).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Data Factory の作成
"Main" を作成する次のコードを、 メソッドに追加します。
// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);
while (
client.Factories.Get(
resourceGroup, dataFactoryName
).ProvisioningState == "PendingCreation"
)
{
System.Threading.Thread.Sleep(1000);
}
リンクされたサービスを作成します
このチュートリアルでは、ソース用とシンク用に、それぞれリンクされた 2 つのサービスを作成します。
Azure Storageリンクされたサービスを作成する
次のコードをMain メソッドに追加して、Azure Storageリンクされたサービスを作成します。 サポートされているプロパティと詳細については、「Azure BLOB のリンクされたサービス のプロパティ」を参照してください。
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey
)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);
Azure SQL Databaseリンクされたサービスを作成する
Main メソッドに次のコードを追加して、Azure SQL Databaseリンクされたサービスを作成します。 サポートされるプロパティと詳細については、「Azure SQL Databaseのリンクされたサービス プロパティを参照してください。
// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");
LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
new AzureSqlDatabaseLinkedService
{
ConnectionString = new SecureString(azureSqlConnString)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);
データセットを作成する
このセクションでは、2 つのデータセットを作成します。1 つはソース用、もう 1 つはシンク用です。
ソース Azure BLOB のデータセットを作成する
次のコードを Main メソッドに追加して、Azure BLOB データセットを作成します。 サポートされているプロパティと詳細については、「Azure BLOB データセットのプロパティを参照してください。
Azure BLOB 内のソース データを表すデータセットを定義します。 この BLOB データセットは、前の手順で作成したAzure Storageリンクされたサービスを参照し、次のことを説明します。
- コピー元の BLOB の場所:
FolderPathおよびFileName - 内容の解析方法を示す BLOB 形式:
TextFormatとその設定 (列区切り記号など)。 - 列名とデータ型を含むデータ構造。この例では、シンク SQL テーブルにマップされます
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference {
ReferenceName = storageLinkedServiceName
},
FolderPath = inputBlobPath,
FileName = inputBlobName,
Format = new TextFormat { ColumnDelimiter = "|" },
Structure = new List<DatasetDataElement>
{
new DatasetDataElement { Name = "FirstName", Type = "String" },
new DatasetDataElement { Name = "LastName", Type = "String" }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);
シンク Azure SQL Database のためのデータセットを作成する
Main メソッドに次のコードを追加して、Azure SQL Database データセットを作成します。 サポートされているプロパティと詳細については、「Azure SQL Database データセットのプロパティを参照してください。
Azure SQL Databaseのシンク データを表すデータセットを定義します。 このデータセットは、前の手順で作成したAzure SQL Databaseリンクされたサービスを参照します。 また、コピーされたデータを保持する SQL テーブルも指定します。
// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
new AzureSqlTableDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = sqlDbLinkedServiceName
},
TableName = azureSqlTableName
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);
パイプラインを作成する
"Main" を作成する次のコードを、 メソッドに追加します。 このチュートリアルでは、このパイプラインに 1 つのアクティビティ (CopyActivity) が含まれています。これは、BLOB データセットをソースとして受け取り、SQL データセットをシンクとして受け取ります。 コピー アクティビティの詳細については、Azure Data Factory のコピー アクティビティを参照してください。
// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToSQL",
Inputs = new List<DatasetReference>
{
new DatasetReference() { ReferenceName = blobDatasetName }
},
Outputs = new List<DatasetReference>
{
new DatasetReference { ReferenceName = sqlDatasetName }
},
Source = new BlobSource { },
Sink = new SqlSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);
パイプラインの実行を作成する
"Main" 次のコードを、 メソッドに追加します。
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
パイプラインの実行を監視する
次に、パイプラインの実行状態を確認するコードを挿入し、コピー アクティビティの実行に関する詳細を取得します。
データのコピーが完了するまでパイプラインの実行の状態を継続的にチェックする次のコードを、
Mainメソッドに追加します。// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId ); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }コピー アクティビティの実行の詳細 (読み取りおよび書き込みされたデータのサイズなど) を取得する次のコードを、
Mainメソッドに追加します。// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10) ); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams ); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(queryResponse.Value.First().Output); } else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
コードの実行
[ビルド]>[ソリューションのビルド] を選択して、アプリケーションをビルドします。 次に、[デバッグ]>[デバッグの開始] の順に選択してアプリケーションを起動し、パイプラインの実行を検証します。
コンソールに、データ ファクトリ、リンクされたサービス、データセット、パイプライン、およびパイプラインの実行の作成の進捗状況が表示されます。 その後、パイプラインの実行状態が確認されます。 コピー アクティビティの実行の詳細と、データの読み取り/書き込みのサイズが表示されるまで、待機します。 次に、SQL Server Management Studio (SSMS) やVisual Studioなどのツールを使用して、コピー先のAzure SQL Databaseに接続し、指定したコピー先テーブルにコピーされたデータが含まれているかどうかを確認できます。
サンプル出力
Creating a data factory AdfV2Tutorial...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
}
}
}
}
Creating linked service AzureSqlDbLinkedService...
{
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": "adfv2tutorial/",
"fileName": "inputEmp.txt",
"format": {
"type": "TextFormat",
"columnDelimiter": "|"
}
},
"structure": [
{
"name": "FirstName",
"type": "String"
},
{
"name": "LastName",
"type": "String"
}
],
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
}
Creating dataset SqlDataset...
{
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": "dbo.emp"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureSqlDbLinkedService"
}
}
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SqlDataset"
}
],
"name": "CopyFromBlobToSQL"
}
]
}
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 18,
"dataWritten": 28,
"rowsCopied": 2,
"copyDuration": 2,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 2
}
Press any key to exit...
関連コンテンツ
このサンプルのパイプラインは、Azure BLOB ストレージ内のある場所から別の場所にデータをコピーします。 以下の方法を学習しました。
- データ ファクトリを作成します。
- リンクサービスとしてAzure StorageとAzure SQL Databaseを作成します。
- Azure Blob データセットとAzure SQL Database データセットを作成します。
- コピー アクティビティが含まれているパイプラインを作成します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
次のチュートリアルに進んで、オンプレミスからクラウドにデータをコピーする方法について学習しましょう。