適用対象:
Azure Data Factory
Azure Synapse Analytics
ヒント
Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。
Azure Data Factoryまたは Synapse パイプラインで使用できるアクティビティには 2 種類あります。
- サポートされるソース データ ストアとシンク データ ストアの間でデータを移動するデータ移動アクティビティ。
- データ変換処理アクティビティは、Azure HDInsight や Azure Batch などのコンピューティング サービスを使用してデータを変換します。
サービスでサポートされていないデータ ストアとの間でデータを移動する場合や、サービスでサポートされていない方法でデータを変換または処理する場合は、独自のデータ移動ロジックまたはデータ変換ロジックでカスタム アクティビティを作成して、パイプラインでそのアクティビティを使用できます。 カスタム アクティビティは、仮想マシンの Azure Batch プールでカスタマイズされたコード ロジックを実行します。
注意
Azure Az PowerShell モジュールを使用してAzureを操作することをお勧めします。 作業を開始するには、「Install Azure PowerShellを参照してください。 Az PowerShell モジュールに移行する方法については、「
Azure Batch サービスを初めて使用する場合は、次の記事を参照してください。
- Azure Batch サービスの概要については、Azure Batchの基本を参照してください。
- New-AzBatchAccount コマンドレットを使用して、Azure Batch アカウント (または) Azure ポータルを作成し、Azure ポータルを使用してAzure Batch アカウントを作成します。 コマンドレットの使用方法の詳細については、「Azure Batch アカウントを管理するための PowerShell の使用に関する記事を参照してください。
- New-AzBatchPool コマンドレットを使用してAzure Batch プールを作成します。
重要
新しいAzure Batch プールを作成するときは、'CloudServiceConfiguration' ではなく、'VirtualMachineConfiguration' を使用する必要があります。
UI を使用してパイプラインにカスタム アクティビティを追加する
パイプラインでカスタム アクティビティを使用するには、次の手順を実行します:
パイプラインの [アクティビティ] ペイン内で Custom を検索し、カスタム アクティビティをパイプライン キャンバスにドラッグします。
まだ選択されていない場合は、キャンバスで新しいカスタム アクティビティを選択します。
Azure Batch タブを選択して、カスタム アクティビティを実行する新しいAzure Batchリンクされたサービスを選択または作成します。
Settings タブを選択し、Azure Batchで実行するコマンドとオプションの詳細を指定します。
リンクされたサービスAzure Batch
次の JSON は、リンクされたサービスAzure Batchサンプルを定義します。 詳細については、サポートされるコンピューティング環境に関する記事を参照してください。
{
"name": "AzureBatchLinkedService",
"properties": {
"type": "AzureBatch",
"typeProperties": {
"accountName": "batchaccount",
"accessKey": {
"type": "SecureString",
"value": "access key"
},
"batchUri": "https://batchaccount.region.batch.azure.com",
"poolName": "poolname",
"linkedServiceName": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
リンクされたサービス Azure Batch の詳細については、Compute のリンクされたサービスに関する記事を参照してください。
カスタム アクティビティ
次の JSON スニペットでは、単純なカスタム アクティビティを使用するパイプラインを定義しています。 アクティビティ定義には、Azure Batchリンクされたサービスへの参照があります。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "helloworld.exe",
"folderPath": "customactv2/helloworld",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}]
}
}
このサンプルでは、helloworld.exe は resourceLinkedService で使用されるAzure Storage アカウントの customactv2/helloworld フォルダーに格納されているカスタム アプリケーションです。 カスタム アクティビティは、このカスタム アプリケーションを送信して、Azure Batchで実行します。 このコマンドは、Azure Batch プール ノードのターゲット オペレーティング システムで実行できる任意の優先アプリケーションに置き換えることができます。
次の表は、このアクティビティに固有のプロパティの名前と説明です。
| プロパティ | 内容 | 必須 |
|---|---|---|
| 名前 | パイプラインのアクティビティの名前。 | はい |
| 説明 | アクティビティの動作を説明するテキスト。 | いいえ |
| 型 | カスタム アクティビティの場合、アクティビティの種類は Custom です。 | はい |
| linkedServiceName | Azure Batchへのリンクされたサービス。 このリンクされたサービスの詳細については、計算のリンクされたサービスに関する記事をご覧ください。 | はい |
| コマンド | 実行されるカスタム アプリケーションのコマンド。 Azure Batch プール ノードで既にアプリケーションを使用できる場合は、resourceLinkedService と folderPath をスキップできます。 たとえば、コマンドを cmd /c dir に指定できます。これは、Windows Batch プール ノードでネイティブにサポートされます。 |
はい |
| リソースリンクサービス | Azure Storageにリンクされたサービスで、カスタムアプリケーションが格納されているストレージアカウントに接続します。 | なし * |
| フォルダパス | カスタム アプリケーションとそのすべての依存関係のフォルダーのパス。 依存関係がサブフォルダー (つまり、folderPath の階層フォルダー構造に格納されている場合は>ファイルがAzure Batchにコピーされるときに、フォルダー構造がフラット化されます。 つまり、すべてのファイルは、サブフォルダーを使用せず、1 つのフォルダーにコピーされます。 この問題を回避するには、ファイルを圧縮し、圧縮されたファイルをコピーした後、目的の場所でカスタム コードと共に解凍することを検討してください。 |
なし * |
| 参照オブジェクト | 既存のリンクされたサービスとデータセットの配列。 カスタム コードがサービスのリソースを参照できるように、参照されているリンク サービスとデータセットが JSON 形式でカスタム アプリケーションに渡されます。 | いいえ |
| 拡張プロパティ | カスタム コードが追加のプロパティを参照できるように、JSON 形式でカスタム アプリケーションに渡すことができるユーザー定義プロパティ。 | いいえ |
| retentionTimeInDays | カスタム アクティビティ用に送信するファイルのリテンション期間。 既定値は 30 日です。 | いいえ |
* プロパティ resourceLinkedService と folderPath は、両方とも指定するか、両方とも省略する必要があります。
注意
カスタム アクティビティでリンクされたサービスを referenceObjects として渡す場合は、セキュリティで保護された文字列を含まないため、Azure Key Vault が有効化されたリンク サービスを渡し、シークレット名を使用して資格情報をコードから直接 Key Vault から取得することをお勧めします。 AKV 対応のリンクされたサービスを参照し、Key Vaultから資格情報を取得し、コード内のストレージにアクセスする here の例を見つけることができます。
注意
現在、カスタム アクティビティの resourceLinkedService では、Azure Blob Storage のみがサポートされており、既定で作成される唯一のリンクされたサービスであり、ADLS Gen2 などの他のコネクタを選択するオプションはありません。
カスタムアクティビティのアクセス許可
カスタム アクティビティは、Azure Batch自動ユーザー アカウントを タスク スコープ (既定の自動ユーザー指定) で非管理者アクセスに設定します。 自動ユーザー アカウントのアクセス許可レベルを変更することはできません。 詳細については、「Batch のユーザー アカウントでタスクを実行する」の「自動ユーザー アカウント」を参照してください。
コマンドの実行
カスタム アクティビティを使用してコマンドを直接実行できます。 次の例では、ターゲット Azure Batch プール ノードで "echo hello world" コマンドを実行し、出力を stdout に出力します。
{
"name": "MyCustomActivity",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "cmd /c echo hello world"
}
}]
}
}
オブジェクトとプロパティを渡す
次のサンプルは、referenceObjects と extendedProperties を使用して、オブジェクトとユーザー定義プロパティをサービスからカスタム アプリケーションに渡す方法を示しています。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "SampleApp.exe",
"folderPath": "customactv2/SampleApp",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
},
"referenceObjects": {
"linkedServices": [{
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
}]
},
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
},
"PropertyBagPropertyName1": "PropertyBagValue1",
"propertyBagPropertyName2": "PropertyBagValue2",
"dateTime1": "2015-04-12T12:13:14Z"
}
}
}]
}
}
アクティビティを実行すると、referenceObjects と extendedProperties が、SampleApp.exe の同じ実行フォルダーに配置されている次のファイルに格納されます。
activity.jsonextendedProperties とカスタム アクティビティのプロパティが格納されます。
linkedServices.jsonreferenceObjects プロパティで定義されているリンクされたサービスの配列が格納されます。
datasets.jsonreferenceObjects プロパティで定義されているデータセットの配列が格納されます。
次のサンプル コードは、SampleApp.exe が JSON ファイルの必要な情報にアクセスする方法を示しています。
using Newtonsoft.Json;
using System;
using System.IO;
namespace SampleApp
{
class Program
{
static void Main(string[] args)
{
//From Extend Properties
dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);
// From LinkedServices
dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
}
}
}
実行の出力を取得する
次の PowerShell コマンドを使用して、パイプラインの実行を開始できます。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
パイプラインが実行されているときは、次のコマンドを使用して実行出力を確認できます。
while ($True) {
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if(!$result) {
Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
}
elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
}
else {
Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
($result | Format-List | Out-String)
Start-Sleep -Seconds 15
}
Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"
カスタム アプリケーションの stdout および stderr は、タスクの GUID を使用して Azure Batch リンクされたサービスを作成するときに定義した、Azure Storage リンクされたサービス内の adfjobs コンテナーに保存されます。 次のスニペットに示すように、アクティビティの実行の出力から詳細なパスを取得できます。
Pipeline ' MyCustomActivity' run finished. Result:
ResourceGroupName : resourcegroupname
DataFactoryName : datafactoryname
ActivityName : MyCustomActivity
PipelineRunId : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName : MyCustomActivity
Input : {command}
Output : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart : 10/5/2017 3:33:06 PM
ActivityRunEnd : 10/5/2017 3:33:28 PM
DurationInMs : 21203
Status : Succeeded
Error : {errorCode, message, failureType, target}
Activity Output section:
"exitcode": 0
"outputs": [
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"
ダウンストリームのアクティビティで stdout.txt の内容を使用する場合は、式 "@activity('MyCustomActivity').output.outputs[0]" で stdout.txt ファイルへのパスを取得できます。
重要
- activity.json、linkedServices.json、datasets.json は、Batch タスクのランタイム フォルダーに格納されます。 この例では、activity.json、linkedServices.json、datasets.json は、
https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/パスに格納されています。 必要に応じて、それらを個別にクリーンアップする必要があります。 - Self-Hosted Integration Runtime を使用するリンクされたサービスの場合、キーやパスワードなどの機密情報は Self-Hosted Integration Runtime によって暗号化され、顧客が定義したプライベート ネットワーク環境に資格情報が保持されます。 この方法でカスタムアプリケーションコードから参照すると、一部の機密フィールドが欠落している可能性があります。 必要に応じて、リンクされたサービスの参照を使用するのではなく、extendedProperties で SecureString を使用してください。
別のアクティビティに出力を渡す
カスタム アクティビティ内のコードからサービスにカスタム値を送信して戻すことができます。 そのためには、アプリケーションからの outputs.json にそれを書き込みます。 サービスでは、outputs.json の内容がコピーされて、customOutput プロパティの値としてアクティビティの出力に追加されます。 (サイズの制限は 2 MBです。)ダウンストリーム アクティビティで outputs.json の内容を使用する場合は、式 @activity('<MyCustomActivity>').output.customOutput を使用して値を取得できます。
SecureString 出力の取得
SecureString 型として指定された機密プロパティ値 (この記事の一部のサンプルに含まれています) は、ユーザー インターフェイスの [監視] タブでマスクされます。 ただし、実際のパイプライン実行では、SecureString プロパティは activity.json ファイル内でプレーン テキストの JSON としてシリアル化されます。 次に例を示します。
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
}
}
このシリアル化は、真にセキュアなものではなく、セキュリティの確保を意図したものではありません。 [監視] タブで値をマスクすることを、サービスに示すことを意図しています。
カスタム アクティビティから SecureString 型のプロパティにアクセスするには、.EXE と同じフォルダーに配置されている activity.json ファイルを読み取り、JSON を逆シリアル化した後、JSON プロパティにアクセスします (extendedProperties => [propertyName] => value)。
Azure Batchの自動スケーリング
autoscale 機能を使用してAzure Batch プールを作成することもできます。 たとえば、0 個の専用 VM を含むAzure バッチ プールと、保留中のタスクの数に基づく自動スケール式を作成できます。
このサンプル式は、次の動作を実現します。プールが最初に作成された場合、1 つの VM から開始されます。 $PendingTasks メトリックにより、実行中 + アクティブ (キューに登録済み) 状態のタスク数が定義されます。 この数式により、最後の 180 秒間の保留タスク平均数が判明し、TargetDedicated が適宜設定されます。 それにより、TargetDedicated が 25 台の仮想マシンを超えることはありません。 そのため、新しいタスクが送信されると、プールが自動的に増加します。タスクが完了すると、VM は 1 台ずつ解放され、自動スケールにより VM が減らされます。 startingNumberOfVMs と maxNumberofVMs はニーズに合わせて調整できます。
自動スケールの数式:
startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);
詳細については、「Azure Batch プール内のコンピューティング ノードの自動スケールを参照してください。
プールで既定の autoScaleEvaluationIntervalを使用する場合、Batch サービスがカスタム アクティビティを実行する前に VM を準備するのに 15 ~ 30 分かかることがあります。 プールが異なる autoScaleEvaluationInterval を使用する場合、Batch サービスは autoScaleEvaluationInterval + 10 分を要することがあります。
関連するコンテンツ
別の手段でデータを変換する方法を説明している次の記事を参照してください。