VS Code で Akri コネクタを構築する

この記事では、 Azure IoT Operations Akri Connectors プレビュー VS Code 拡張機能を使用して、カスタム Akri コネクタをビルド、検証、デバッグ、発行する方法について説明します。

この拡張機能では、次のプラットフォームがサポートされています。

  • Linux
  • Linux 用 Microsoft Windows サブシステム (WSL)
  • ウィンドウズ

この拡張機能を使用すると、次のプログラミング言語を使用してコネクタを作成できます。

  • .NET
  • Rust

[前提条件]

開発環境:

Docker の構成:

  • 拡張機能で使用されるイメージは、拡張機能を使用する前に、ローカルでプルしてタグ付けする必要があります。

    docker pull mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8
    docker tag mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8 devx-runtime
    
  • 拡張機能が起動するすべてのコンテナーは、ネットワーク分離のために aio_akri_network という名前のカスタム ネットワーク上で実行するように構成されます。

    docker network create aio_akri_network
    
  • DevX コンテナーは、カスタム ボリューム akri_devx_docker_volume を使用してクラスター構成を格納します。

    docker volume rm akri_devx_docker_volume # delete the volume created from any previous release
    docker volume create akri_devx_docker_volume
    

Azure IoT Operations インスタンスでコネクタをデプロイして使用するには、次も必要です。

  • Azure IoT Operations のインスタンス。
  • Azure Container Registry などのコンテナー レジストリにアクセスして、コネクタ イメージを発行します。
  • コネクタ イメージをプルするように Azure IoT Operations インスタンスで構成されたコンテナー レジストリ エンドポイント。 詳細については、「 レジストリ エンドポイントの構成」を参照してください。

Akri コネクタを作成して検証する

この例では、C# 言語を使用して HTTP/REST コネクタを作成し、Docker イメージをビルドしてから、VS Code 拡張機能を使用してコネクタ アプリケーションを実行します。

  1. Ctrl+Shift+Pを押してコマンド パレットを開き、Azure IoT Operations Akri Connectors: Create a New Akri Connector コマンドを検索します。 my-connectorsという名前の新しいフォルダーを作成して選択し、言語として C# を選択し、rest_connectorなどのコネクタの名前を入力し、コネクタの種類として PollingTelemetryConnector を選択します。

  2. 拡張機能は、前の手順で選択したコネクタ名を使用して、新しいワークスペースを作成します。 ワークスペースには、C# 言語で記述されたポーリング テレメトリ コネクタのスキャフォールディングが含まれています。

次の手順では、 MyConnector という名前の .NET プロジェクトを作成していることを前提としています。

Important

次のコード例は例示のみを目的としており、運用環境で使用するためのものではありません。 運用コネクタでは、堅牢なエラー処理と再試行ロジックを実装し、資産への接続に使用される資格情報が安全に保存および使用されるようにする必要があります。 運用品質コネクタは、SDK リポジトリの Akri オペレーターおよびコネクタ コントラクト ドキュメントで説明されているコントラクトを実装する必要があります。

サーモスタットの状態を表すには、ワークスペース内 フォルダーに ThermostatStatus.cs という名前のファイルを作成し、次の内容を含めます。 このファイルは、REST エンドポイントからの JSON 応答をモデル化します。

using System.Text.Json.Serialization;

namespace MyConnector
{
    internal class ThermostatStatus
    {
        [JsonPropertyName("desiredTemperature")]
        public double? DesiredTemperature { get; set; }

        [JsonPropertyName("currentTemperature")]
        public double? CurrentTemperature { get; set; }
    }
}    

データ ポイントの構成設定を追加するには、ワークスペースの フォルダーに MyConnector という名前のファイルを作成し、次の内容を含めます。 このファイルは、操作エクスペリエンス UI でユーザーが指定するデータ ポイントの構成設定をモデル化します。

using System.Text.Json.Serialization;

namespace MyConnector
{
    public class DataPointConfiguration
    {
        [JsonPropertyName("HttpRequestMethod")]
        public string? HttpRequestMethod { get; set; }
    }
} 

指定されたSampleDatasetAsync クラスにDatasetSampler メソッドを実装します。 このメソッドは、パラメーターとして Dataset を受け取ります。 Datasetには、コネクタが処理するデータ ポイントが含まれています。

  1. VS Code ワークスペースでファイル MyConnector/DatasetSampler.cs を開きます。

  2. エンドポイント データを処理するために必要なデータを渡すには、 DatasetSampler クラスにコンストラクターを追加します。 このクラスは、 HttpClientEndpointProfileCredentials を使用して、資産エンドポイントに接続して認証します。

    private readonly HttpClient _httpClient;
    private readonly string _assetName;
    private readonly EndpointCredentials? _credentials;
    
    private readonly static JsonSerializerOptions _jsonSerializerOptions = new()
    {
        AllowTrailingCommas = true,
    };
    
    public DatasetSampler(HttpClient httpClient, string assetName, EndpointCredentials? credentials)
    {
        _httpClient = httpClient;
        _assetName = assetName;
        _credentials = credentials;
    }
    
    public ValueTask DisposeAsync()
    {
        _httpClient.Dispose();
        return ValueTask.CompletedTask;
    }
    
  3. GetSamplingIntervalAsync メソッドを変更して、3 秒のサンプリング間隔を返します。

    public Task<TimeSpan> GetSamplingIntervalAsync(AssetDataset dataset, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(TimeSpan.FromSeconds(3));
    }
    

    わかりやすくするために、この例では固定サンプリング間隔を使用します。 運用コネクタでは、コネクタ メタデータを使用して、ユーザーが操作エクスペリエンス UI で設定できるサンプリング間隔プロパティを定義することで、サンプリング間隔を構成できます。

  4. 既存の SampleDatasetAsync メソッドを次のアウトラインに置き換えます。

    public async Task<byte[]> SampleDatasetAsync(AssetDataset dataset, CancellationToken cancellationToken = default)
    {
        int retryCount = 0;
        while (true)
        {
            try
            {
                // TODO: Implement your dataset sampling logic here.
            }
            catch (Exception ex)
            {
                if (++retryCount >= 3)
                {
                    throw new InvalidOperationException($"Failed to sample dataset with name {dataset.Name} in asset with name {_assetName}. Error: {ex.Message}", ex);
                }
                await Task.Delay(1000, cancellationToken);
            }
        }
    }
    
  5. try メソッドの SampleDatasetAsync ブロックで、次のコードを追加して、DataPointから各DataSetを取得し、データ ソース パスを抽出します。 これらのパスは、REST エンドポイントからデータをフェッチするために使用される URL の一部です。 currentTemperaturedesiredTemperatureのデータ ポイントは、以前に ThermostatStatus クラスでモデル化されています。 HTTP 要求メソッドは、 DataPointConfiguration クラスでモデル化されたデータ ポイント構成から抽出されます。

    AssetDatasetDataPointSchemaElement httpServerDesiredTemperatureDataPoint = dataset.DataPoints!.Where(x => x.Name!.Equals("desiredTemperature"))!.First();
    HttpMethod httpServerDesiredTemperatureHttpMethod = HttpMethod.Parse(JsonSerializer.Deserialize<DataPointConfiguration>(httpServerDesiredTemperatureDataPoint.DataPointConfiguration!, _jsonSerializerOptions)!.HttpRequestMethod);
    string httpServerDesiredTemperatureRequestPath = httpServerDesiredTemperatureDataPoint.DataSource!;
    
    AssetDatasetDataPointSchemaElement httpServerCurrentTemperatureDataPoint = dataset.DataPoints!.Where(x => x.Name!.Equals("currentTemperature"))!.First();
    HttpMethod httpServerCurrentTemperatureHttpMethod = HttpMethod.Parse(JsonSerializer.Deserialize<DataPointConfiguration>(httpServerCurrentTemperatureDataPoint.DataPointConfiguration!, _jsonSerializerOptions)!.HttpRequestMethod);
    string httpServerCurrentTemperatureRequestPath = httpServerCurrentTemperatureDataPoint.DataSource!;
    

    わかりやすくするために、この例では、データ ポイント構成から使用する HTTP メソッドを取得する方法のみを示します。 このサンプルでは、HTTP 要求を行うときにこの値は使用されません。

  6. 同じ方法で、認証されたエンドポイントが使用されている場合は、指定された資格情報を使用して認証を設定します。

    if (_credentials != null && _credentials.Username != null && _credentials.Password != null)
    {
        // Note that this sample uses username + password for authenticating the connection to the asset. In general,
        // x509 authentication should be used instead (if available) as it is more secure.
        string httpServerUsername = _credentials.Username;
        string httpServerPassword = _credentials.Password;
        var byteArray = Encoding.ASCII.GetBytes($"{httpServerUsername}:{httpServerPassword}");
        _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
    }
    

    このコードは、資格情報を抽出し、承認ヘッダーに追加します。 DatasetSamplerは、ユーザー名とパスワードの資格情報を使用して基本認証を実装します。

  7. 次に、エンドポイントに HTTP 要求を行うコードを追加し、応答を逆シリアル化し、 CurrentTemperature プロパティと DesiredTemperature プロパティの両方を抽出して、 ThermostatStatus オブジェクトに配置します。

    var currentTemperatureHttpResponse = await _httpClient.GetAsync(httpServerCurrentTemperatureRequestPath);
    var desiredTemperatureHttpResponse = await _httpClient.GetAsync(httpServerDesiredTemperatureRequestPath);
    
    if (currentTemperatureHttpResponse.StatusCode == System.Net.HttpStatusCode.Unauthorized
        || desiredTemperatureHttpResponse.StatusCode == System.Net.HttpStatusCode.Unauthorized)
    {
        throw new Exception("Failed to authorize request to HTTP server. Check credentials configured in rest-server-device-definition.yaml.");
    }
    
    currentTemperatureHttpResponse.EnsureSuccessStatusCode();
    desiredTemperatureHttpResponse.EnsureSuccessStatusCode();
    
    ThermostatStatus thermostatStatus = new()
    {
        CurrentTemperature = (JsonSerializer.Deserialize<ThermostatStatus>(await currentTemperatureHttpResponse.Content.ReadAsStreamAsync(), _jsonSerializerOptions)!).CurrentTemperature,
        DesiredTemperature = (JsonSerializer.Deserialize<ThermostatStatus>(await desiredTemperatureHttpResponse.Content.ReadAsStreamAsync(), _jsonSerializerOptions)!).DesiredTemperature
    };
    
  8. 次に、状態を JSON にシリアル化し、エンドポイントに応答を返します。 この例では、HTTP 応答ペイロードは予想されるメッセージ スキーマと既に一致しているため、変換は必要ありません。

    // The HTTP response payload matches the expected message schema, so return it as-is
    return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(thermostatStatus));
    

    ヒント

    必要に応じて、コネクタはスキーマ レジストリにスキーマを登録して、他の Azure IoT 操作がメッセージの形式を理解できるようにします。

  9. 最後に、必要な型をインポートします。

    using Azure.Iot.Operations.Connector.Files;
    using System.Net.Http.Headers;
    using System.Text;
    using System.Text.Json;
    

コードの最終バージョンは DatasetSampler のようになります。

CreateDatasetSampler クラスに DatasetSamplerProvider メソッドを実装します。 このクラスは DataSetSampler オブジェクトを作成し、必要に応じてアプリケーションに挿入します。

  1. VS Code ワークスペースで MyConnector/DatasetSamplerProvider.cs ファイルを開きます。

  2. CreateDatasetSampler メソッドで、データセット名がDatasetSamplerされている場合は、endpointCredentialsと共にthermostat_statusを返します。

    if (dataset.Name.Equals("thermostat_status"))
    {
        if (device.Endpoints != null
            && device.Endpoints.Inbound != null
            && device.Endpoints.Inbound.TryGetValue(inboundEndpointName, out var inboundEndpoint))
        {
            var httpClient = new HttpClient()
            {
                BaseAddress = new Uri(inboundEndpoint.Address),
            };
    
            return new DatasetSampler(httpClient, assetName, endpointCredentials);
        }
    }
    
    throw new InvalidOperationException($"Unrecognized dataset with name {dataset.Name} on asset with name {assetName}");
    

    わかりやすくするために、この例では、データセット名が常に thermostat_statusされていることを前提としています。 運用コネクタでは、複数のデータセットを処理する追加のロジックを実装できます。

コードの最終バージョンは DatasetSamplerProvider のようになります。

次に、プロジェクトをビルドして、エラーがないことを確認します。 VS Code コマンド Azure IoT Operations Akri Connectors: Build an Akri Connector を 使用し、 リリース モードを選択します。 このコマンドは、ビルドの進行状況を 出力 コンソールに表示し、ビルドが完了したときに通知します。 その後、 <connector_name> という名前の新しい Docker イメージを Docker Desktop でローカルに release タグ付きで確認できます。

新しいコネクタをローカルでテストするには、次の手順に従います。

  1. コネクタの接続先の REST サーバーとして機能するローカル エンドポイントを作成します。 前に複製した explore-iot-operation リポジトリで、次のコマンドを実行して、テスト用のローカル REST サーバーを構築します。

    cd samples/akri-vscode-extension/sample-rest-server
    docker build -t rest-server:latest .
    

    Docker Desktop でイメージを確認できます。

  2. 次のコマンドを実行して、ローカル コンテナーで REST サーバーを起動します。

    docker run -d --rm --network aio_akri_network --name restserver rest-server:latest
    
  3. Docker Desktop で実行されているコンテナーを確認できます。 REST サーバーは、http://restserver:3000で実行されているコンテナーのaio_akri_networkでアクセスできます。

  4. rest-server-device-definition.yaml リポジトリのローカル コピーの samples/akri-vscode-extension/rest-server-custom-resources フォルダーから VS Code のコネクタ ワークスペースの explore-iot-operations フォルダーにファイル をコピーします。 このデバイス リソースは、REST サーバーへのエンドポイント接続を定義します。

  5. rest-server-asset1-definition.yaml リポジトリのローカル コピーの samples/akri-vscode-extension/rest-server-custom-resources フォルダーから VS Code のコネクタ ワークスペースの explore-iot-operations フォルダーにファイル をコピーします。 この資産は、デバイスから mqtt/machine/asset1/status MQTT トピックに温度情報を発行します。

  6. rest-server-asset2-definition.yaml リポジトリのローカル コピーの samples/akri-vscode-extension/rest-server-custom-resources フォルダーから VS Code のコネクタ ワークスペースの explore-iot-operations フォルダーにファイル をコピーします。 この資産は、デバイスから状態ストアに温度情報を発行します。

  7. デバイスと資産リソースを使用してコネクタをテストするには、VS Code ワークスペースの [実行とデバッグ ] パネルに移動し、[ Akri コネクタの実行 ] 構成を選択します。 この構成では、事前起動タスクを実行するターミナルが起動され、 aio-broker コンテナーと、 <connector_name>_releaseという別のコンテナーで開発した REST コネクタが開始されます。 このプロセスには数分かかります。 VS Code のターミナル ウィンドウで REST コネクタを使用して、REST サーバーから MQ ブローカーへのテレメトリ データ フローを確認できます。 コンテナー ログは、Docker Desktop にも表示されます。

  8. デバッグ コマンド パネルの [停止 ] ボタンを使用すると、いつでも実行を停止できます。 このコマンドを実行すると、実行中のコンテナーがクリーンアップされ、 aio-broker および <connector_name>_releaseが削除されます。

Akri コネクタをデバッグする

.NET ベースの Akri コネクタをデバッグするには、 C# VS Code 拡張機能がインストールされていることを確認します。 前に作成したのと同じ REST コネクタを使用します。

  1. デバッグ モードでコネクタをビルドするには、VS Code コマンド Azure IoT Operations Akri Connectors を使用します。Akri コネクタをビルドし、[デバッグ モード] を選択します。 このコマンドは、タグ <connector_name>を使用して debug というローカル Docker イメージを作成します。 Docker Desktop でイメージを確認できます。

  2. ブレークポイントを追加すると、ブレークポイントにヒットすると実行が停止します。 SampleDatasetAsyncDatasetSampler.cs メソッドの先頭にブレークポイントを追加してみてください。

  3. コネクタをデバッグするには、VS Code ワークスペースの [実行とデバッグ ] パネルに移動し、[ Akri コネクタのデバッグ ] 構成を選択します。 この構成では、事前起動タスクを実行するターミナルが起動され、 aio-broker コンテナーと、 <connector_name>_debugという別のコンテナーで開発した REST コネクタが開始されます。 このプロセスには数分かかります。 VS Code のターミナル ウィンドウで REST コネクタを使用して、REST サーバーから MQ ブローカーへのテレメトリ データ フローを確認できます。 コンテナー ログは、Docker Desktop にも表示されます。

  4. デバッグ コマンド パネルの [切断 ] ボタンを使用して、実行を終了します。

Akri VS Code 拡張機能は、タイムアウト期間が 3 分の実行/デバッグ シナリオで DevX コンテナーを起動します。 コンテナーがタイムアウト期間内に起動を完了しない場合、拡張機能はコンテナーを終了します。

構成の更新プログラムを適用する

コネクタの実行中に、ローカル ランタイム環境でデバイスと資産の構成を動的に更新できます。 この機能を使用すると、コネクタが構成の変更に応答することを確認できます。 次の VS Code 拡張機能コマンドを使用して、これらの変更を行います。

  • Azure IoT Operations Akri コネクタ: クラスターにデバイス YAML を適用する
  • Azure IoT Operations Akri コネクタ: クラスターに資産 YAML を適用する
  • Azure IoT Operations Akri コネクタ: クラスターからデバイス YAML を削除する
  • Azure IoT Operations Akri Connectors: クラスターから資産 YAML を削除する

コネクタの状態をキャプチャする

スキーマ レジストリの現在の状態をキャプチャするには、 Azure IoT Operations Akri Connectors: Capture Connector State VS Code 拡張機能コマンドを使用します。 このコマンドは、現在のタイムスタンプに基づく名前を持つフォルダーをワークスペース OUTPUT フォルダーに作成します。 作成されたフォルダーには、カスタム コネクタによって作成されたすべてのスキーマを含む、スキーマ レジストリの現在の状態のコピーが含まれます。

スキーマ レジストリの状態は、常に Output/ConnectorState フォルダーに表示されます。 このコマンドを使用すると、特定の時点でのスキーマ レジストリの状態をキャプチャできます。

コネクタ イメージを発行する

Azure IoT Operations Akri コネクタ: Akri Connector Image または Metadata コマンドを使用して、コネクタ イメージを Microsoft Azure Container Registry (ACR) レジストリに発行します。 このコマンドでは、Microsoft Azure CLI と oras コマンドを使用します。 ACR レジストリに発行するには、Azure サブスクリプション ID と ACR レジストリ名が必要です。

オーサー コネクタ メタデータ設定の構成

Akri Connector の作成コマンドから作成した VS Code ワークスペースを使用して、connector-metadata.jsonに準拠する ファイルを作成します。 このファイルは、コネクタ ワークスペース内の任意の場所に配置できます。 この拡張機能は、 connector-metadata.json ファイルを使用して静的検証機能を提供し、必要なプロパティがない場合は、 PROBLEMS パネルに警告を表示します。

メタデータ成果物を発行する

ACR レジストリにメタデータ フォルダーを発行するには、 Azure IoT Operations Akri コネクタの [Publish Akri Connector Image or Metadata ] コマンドを使用します。 このコマンドでは、Azure CLI と oras コマンドを使用します。 ACR レジストリに発行するには、Azure サブスクリプション ID と ACR レジストリ名が必要です。 現在、拡張機能では、connector-metadata.json という名前のファイルが期待されており、オプションで additionalConfig.json も存在することが想定されています。それらはプッシュする任意のフォルダーに存在している必要があります。

既知の問題

  • 現在、 Delete/Apply Asset/Device YAML VS Code コマンドに起因する構成の更新は、Linux カーネルでの CIFS 実装の制限により、Windows では機能しません。 ホスト上のマウントされたフォルダー内のファイル変更イベントは、Docker for Windows によってコンテナーに反映されません。

  • VS Code コマンドを使用してクラスターから資産またはデバイスを削除すると、.NET コネクタは現在、次の 404 エラーをスローします。

    Unhandled exception. Azure.Iot.Operations.Protocol.Retry.RetryExpiredException: Retry expired while attempting the operation. Last known exception is the inner exception.
    ---> Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models.AkriServiceErrorException: ApiError: assets.namespaces.deviceregistry.microsoft.com "my-rest-thermostat-asset2" not found: NotFound (ErrorResponse { status: "Failure", message: "assets.namespaces.deviceregistry.microsoft.com \"my-rest-thermostat-asset2\" not found", reason: "NotFound", code: 404 })
    at Azure.Iot.Operations.Services.AssetAndDeviceRegistry.AdrServiceClient.<>c__DisplayClass19_0.<<SetNotificationPreferenceForAssetUpdatesAsync>b__0>d.MoveNext()
    --- End of stack trace from previous location ---
    at Azure.Iot.Operations.Services.AssetAndDeviceRegistry.AdrServiceClient.RunWithRetryAsync[TResult](Func`1 taskFunc, CancellationToken cancellationToken)
    --- End of inner exception stack trace ---
    at Azure.Iot.Operations.Services.AssetAndDeviceRegistry.AdrServiceClient.RunWithRetryAsync[TResult](Func`1 taskFunc, CancellationToken cancellationToken)
    at Azure.Iot.Operations.Services.AssetAndDeviceRegistry.AdrServiceClient.SetNotificationPreferenceForAssetUpdatesAsync(String deviceName, String inboundEndpointName, String assetName, NotificationPreference notificationPreference, Nullable`1 commandTimeout, CancellationToken cancellationToken)
    at Azure.Iot.Operations.Connector.AdrClientWrapper.AssetFileChanged(Object sender, AssetFileChangedEventArgs e)
    at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
    at System.Threading.ThreadPoolWorkQueue.Dispatch()
    at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
    
  • 現時点では、Docker Desktop をインストールせずに WSL からコンテナーとして DevX イメージを起動すると、コンテナーが永久にハングします。