Générer des modules WASM pour les flux de données

La fonctionnalité de traitement des données WebAssembly (WASM) personnalisée dans Opérations Azure IoT permet le traitement des données de télémétrie en temps réel au sein de votre cluster Opérations Azure IoT. En déployant des modules WASM personnalisés, vous pouvez définir et exécuter des transformations de données dans le cadre de votre graphe de flux de données, du connecteur HTTP/REST ou du connecteur MQTT.

Cet article explique comment utiliser l’extension Opérations Azure IoT Data Flow VS Code, l’interface CLI dataflow-dev ou les outils standard installés dans votre environnement pour développer et tester vos modules WASM localement avant de les déployer sur votre cluster Opérations Azure IoT. Vous allez apprendre à :

  • Exécutez une application de graphe localement en exécutant un graphe prédéfini avec des exemples de données pour comprendre le flux de travail de base.
  • Créez des modules WASM personnalisés en créant de nouveaux opérateurs dans Python et Rust avec des fonctionnalités de mappage et de filtre.

Utilisez l’extension VS Code pour une boucle de développement interne lorsque vous créez activement des opérateurs et des graphiques, par exemple : écrire du code, générer, examiner des erreurs, déboguer, apporter des modifications, mettre à jour le graphe et publier.

Utilisez l’interface en ligne de commande (CLI) pour les workflows pour la qualité des graphes axés sur CI/CD, par exemple : compiler du code existant, exécuter le graphe, tester les résultats par rapport à des résultats de référence et surveiller l'évolution de la qualité au fil du temps.

Utilisez les outils standard installés dans votre environnement pour les scénarios où vous souhaitez plus de contrôle sur le processus de génération et de test, ou quand vous devez intégrer d’autres outils de développement et flux de travail.

Cet article explique comment générer et tester vos modules WASM localement. Pour les utiliser dans Opérations Azure IoT flux de données et les connecteurs, vous devez les déployer sur votre cluster Opérations Azure IoT et les référencer dans votre configuration de graphe ou de connecteur :

Pour des scénarios plus avancés, consultez Créer des graphiques WASM avec état avec le magasin d’états, Utiliser le registre de schémas avec des modules WASM,des modules WASM de débogage et des modules WASM de test.

L’extension et l’outil CLI sont pris en charge sur les plateformes suivantes :

  • Linux
  • Sous-système Windows pour Linux (WSL)
  • Windows (veillez à utiliser un interpréteur de commandes tel que PowerShell ou l'Invite de commandes pour exécuter les commandes d'extension sur Windows)

Pour en savoir plus sur les graphiques et WASM dans Opérations Azure IoT, consultez :

Prerequisites

Environnement de développement :

Images Docker :

docker pull mcr.microsoft.com/azureiotoperations/processor-app:1.1.5
docker tag mcr.microsoft.com/azureiotoperations/processor-app:1.1.5 host-app

docker pull mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8
docker tag mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8 devx

docker pull mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2
docker tag mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2 statestore-cli

docker pull eclipse-mosquitto

Exécuter une application de graphe localement

Cet exemple utilise un exemple d’espace de travail qui contient toutes les ressources nécessaires pour générer et exécuter une application graphe localement à l’aide de l’extension VS Code.

Ouvrir l’exemple d’espace de travail dans VS Code

Clonez le référentiel Explore IoT Operations si ce n'est déjà fait.

Ouvrez le dossier samples/wasm dans Visual Studio Code en sélectionnant File > Ouvrir le dossier et en accédant au dossier samples/wasm.

Générer les opérateurs

Appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Générer tous les opérateurs. Sélectionnez la mise en production en tant que mode de génération.

Cette commande génère tous les opérateurs de l’espace de travail et crée des .wasm fichiers dans le operators dossier. Vous utilisez les fichiers .wasm pour exécuter l’application graphe localement.

Exécuter l’application graphe localement

Pour démarrer l’environnement d’exécution local, appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Démarrer l’environnement de développement. Sélectionnez la mise en production comme mode d’exécution.

Lorsque l’environnement d’exécution local est en cours d’exécution, appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Exécuter Application Graph. Sélectionnez la mise en production comme mode d’exécution. Cette commande exécute localement l’application graphe à l’aide de l’environnement d’exécution local avec le fichier graph.dataflow.yaml de l’espace de travail.

Il lit également à partir de hostapp.env.list pour définir la variable TK_CONFIGURATION_PARAMETERS d’environnement pour les paramètres de configuration de l’opérateur de flux de données.

Lorsque vous êtes invité à entrer des données d’entrée, sélectionnez le data-and-images dossier dans l’espace de travail. Ce dossier contient les fichiers de données d’entrée de l’application graphique, y compris les données de température et d’humidité, ainsi que certaines images du module d’instantané.

Attendez de voir une notification VS Code indiquant que les journaux sont prêts : Log files for the run can be found at ...\wasm\data-and-images\output\logs.

La sortie se trouve dans le output dossier sous le data-and-images dossier. Vous pouvez ouvrir le output dossier dans l’espace de travail pour afficher les fichiers de sortie. Le .txt fichier avec la date et l’heure du nom de fichier contient les données traitées, et il ressemble à l’exemple suivant :

{"tst":"2025-09-19T04:19:13.530381+0000","topic":"sensors","qos":0,"retain":0,"payloadlen":312,"properties":{"payload-format-indicator":1,"message-expiry-interval":10,"correlation-data":"...","user-properties":{"__ts":"001758255553528:00000:...","__protVer":"1.0","__srcId":"mqtt-source"},"content-type":"application/json"},"payload":{"temperature":[{"count":2,"max":653.888888888889,"min":204.44444444444449,"average":429.16666666666669,"last":204.44444444444449,"unit":"C","overtemp":true}],"humidity":[{"count":3,"max":85,"min":45,"average":69.666666666666671,"last":79}],"object":[{"result":"notebook, notebook computer; sliding door"}]}}

La sortie indique que l’application de graphique a traité les données d’entrée et généré la sortie. La sortie inclut les données de température et d’humidité, ainsi que les objets détectés dans les images.

Créer un graphe avec des modules WASM personnalisés

Ce scénario vous montre comment créer une application de graphe avec des modules WASM personnalisés. L’application de graphe se compose de deux opérateurs : un map opérateur qui convertit les valeurs de température de Fahrenheit en Celsius et un filter opérateur qui filtre les messages avec des valeurs de température supérieures à 500 °C.

Actuellement, n’utilisez pas de traits d’union (-) ou de traits de soulignement (_) dans les noms d’opérateurs. L’extension VS Code applique cette exigence, mais si vous créez ou renommez manuellement des modules, cela provoque des problèmes. Utilisez des noms alphanumériques simples pour les modules tels que filter, , mapstateenrichou schemafilter.

Au lieu d’utiliser un exemple d’espace de travail existant, vous créez un espace de travail à partir de zéro. Ce processus vous permet de découvrir comment créer une application de graphe et programmer les opérateurs dans Python et Rust.

Créer un projet d’application graphe dans Python

Appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes VS Code et rechercher Opérations Azure IoT : Créer une application :

  1. Pour le dossier, sélectionnez un dossier dans lequel vous souhaitez créer le projet. Vous pouvez créer un dossier pour ce projet.
  2. Entrez my-graph comme nom.
  3. Sélectionnez Python comme langue.
  4. Sélectionnez Mapper comme type.
  5. Entrez map comme nom.

Vous disposez maintenant d’un nouvel espace de travail VS Code avec la structure de projet de base et les fichiers de démarrage. Les fichiers de démarrage incluent le fichier graph.dataflow.yaml et le code source du modèle d’opérateur de mappage.

Important

Pour utiliser un module Python dans une instance de Opérations Azure IoT déployée, vous devez déployer l’instance avec le profil de mémoire broker défini sur Medium ou High. Si vous définissez le profil de mémoire sur Low ou Tiny, l'instance ne peut pas extraire le module Python.

Ajouter Python code pour le module d’opérateur de carte

Ouvrez le fichier et remplacez le operators/map/map.py contenu par le code suivant pour convertir une valeur de température entrante de Fahrenheit en Celsius :

import json
from map_impl import exports
from map_impl import imports
from map_impl.imports import types

class Map(exports.Map):
    def init(self, configuration) -> bool:
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "Init invoked")
        return True

    def process(self, message: types.DataModel) -> types.DataModel:
        # TODO: implement custom logic for map operator
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "processing from python")

        # Ensure the input is of the expected type
        if not isinstance(message, types.DataModel_Message):
            raise ValueError("Unexpected input type: Expected DataModel_Message")

        # Extract and decode the payload
        payload_variant = message.value.payload
        if isinstance(payload_variant, types.BufferOrBytes_Buffer):
            # It's a Buffer handle - read from host
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Buffer")
            payload = payload_variant.value.read()
        elif isinstance(payload_variant, types.BufferOrBytes_Bytes):
            # It's already bytes
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Bytes")
            payload = payload_variant.value
        else:
            raise ValueError("Unexpected payload type")

        decoded = payload.decode("utf-8")

        # Parse the JSON data
        json_data = json.loads(decoded)

        # Check and update the temperature value
        if "temperature" in json_data and "value" in json_data["temperature"]:
            temp_f = json_data["temperature"]["value"]
            if isinstance(temp_f, int):
                # Convert Fahrenheit to Celsius
                temp_c = round((temp_f - 32) * 5.0 / 9.0)

                # Update the JSON data
                json_data["temperature"]["value"] = temp_c
                json_data["temperature"]["unit"] = "C"

                # Serialize the updated JSON back to bytes
                updated_payload = json.dumps(json_data).encode("utf-8")

                # Update the message payload
                message.value.payload = types.BufferOrBytes_Bytes(value=updated_payload)

        return message

Vérifiez que Docker est en cours d’exécution. Ensuite, appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Générer tous les opérateurs. Créez un module de mise en production .

Le processus de génération place le map.wasm fichier de l’opérateur map dans le operators/map/bin/release dossier.

Ajouter du code Rust pour le module d’opérateur de filtre

Créez un opérateur en appuyant sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Créer un opérateur :

  1. Sélectionnez Rust comme langue.
  2. Sélectionnez Filtrer comme type d’opérateur.
  3. Entrez filter comme nom.

Ouvrez le operators/filter/src/lib.rs fichier et remplacez le contenu par le code suivant pour filtrer les valeurs où la température est supérieure à 500 °C :

mod filter_operator {
    use wasm_graph_sdk::macros::filter_operator;
    use serde_json::Value;

    fn filter_init(_configuration: ModuleConfiguration) -> bool {
        // Add code here to process the module init properties and module schemas from the configuration

        true
    }

    #[filter_operator(init = "filter_init")]
    fn filter(input: DataModel) -> Result<bool, Error> {

        // Extract payload from input to process
        let payload = match input {
            DataModel::Message(Message {
                payload: BufferOrBytes::Buffer(buffer),
                ..
            }) => buffer.read(),
            DataModel::Message(Message {
                payload: BufferOrBytes::Bytes(bytes),
                ..
            }) => bytes,
            _ => return Err(Error { message: "Unexpected input type".to_string() }),
        };

        // ... perform filtering logic here and return boolean
        if let Ok(payload_str) = std::str::from_utf8(&payload) {
            if let Ok(json) = serde_json::from_str::<Value>(payload_str) {
                if let Some(temp_c) = json["temperature"]["value"].as_i64() {
                    // Return true if temperature is above 500°C
                    return Ok(temp_c > 500);
                }
            }
        }

        Ok(false)
   }
}

Ouvrez le operators/filter/Cargo.toml fichier et ajoutez les dépendances suivantes :

[dependencies]
# ...
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

Vérifiez que Docker est en cours d’exécution. Ensuite, appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Générer tous les opérateurs. Créez un module de mise en production .

Le processus de génération place le filter.wasm fichier de l’opérateur filter dans le operators/filter/bin/release dossier.

Exécuter l’application graphe localement avec des exemples de données

Ouvrez le fichier graph.dataflow.yaml et remplacez son contenu par le code suivant :

metadata:
    $schema: "https://www.schemastore.org/aio-wasm-graph-config-1.0.0.json"
    name: "Temperature Monitoring"
    description: "A graph that converts temperature from Fahrenheit to Celsius, if temperature is above 500°C, then sends the processed data to the sink."
    version: "1.0.0"
    vendor: "Microsoft"
moduleRequirements:
    apiVersion: "1.1.0"
    runtimeVersion: "1.1.0"
operations:
  - operationType: source
    name: source
  - operationType: map
    name: map
    module: map
  - operationType: filter
    name: filter
    module: filter
  - operationType: sink
    name: sink
connections:
  - from:
      name: source
    to:
      name: map
  - from:
      name: map
    to:
      name: filter
  - from:
      name: filter
    to:
      name: sink

Copiez le data dossier qui contient les exemples de données du référentiel explore-iot-operations\samples\wasm\data d’exemples clonés dans l’espace de travail actuel. Le data dossier contient trois fichiers JSON avec des exemples de données de température d’entrée.

Si vous avez précédemment arrêté l’environnement d’exécution local, appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Démarrer l’environnement de développement. Sélectionnez la mise en production comme mode d’exécution.

Appuyez sur Ctrl+Shift+P pour ouvrir la palette de commandes et rechercher Opérations Azure IoT : Exécuter Application Graph :

  1. Sélectionnez le graph.dataflow.yaml fichier de graphique.
  2. Sélectionnez la mise en production comme mode d’exécution.
  3. Sélectionnez le data dossier que vous avez copié dans l’espace de travail.

Le conteneur DevX démarre pour exécuter le graphique. Le résultat traité est enregistré dans le data/output dossier. Le fichier texte du output dossier contient les données traitées avec la température convertie en Celsius et filtrée en fonction du seuil.

Pour savoir comment déployer vos modules ET graphiques WASM personnalisés sur votre instance de Opérations Azure IoT, consultez Déployer des modules WASM et des graphiques de flux de données.

Lorsque vos modules WASM sont créés et que votre application graphique est configurée, vous pouvez les déployer sur votre cluster Opérations Azure IoT et les référencer dans votre configuration de graphe ou de connecteur :

Résolution des problèmes

Erreurs de compilation

Error La cause Réparer
error[E0463]: can't find crate for std Cible WASM manquante Exécutez rustup target add wasm32-wasip2
error: no matching package found pour wasm_graph_sdk Registre de marchandises manquant Ajoutez le bloc [registries] à .cargo/config.toml comme indiqué dans Exécuter une application de graphe localement
componentize-py impossible de trouver des fichiers WIT Chemin d’accès du schéma incorrect Utilisez l’indicateur -d avec le chemin d’accès complet au répertoire du schéma. Tous les .wit fichiers doivent être présents, car ils font référence les uns aux autres.
componentize-py incompatibilité de version Liaisons générées avec une version différente Supprimer le répertoire des liaisons générées et régénérer avec la même componentize-py version
wasm-tools échec de la vérification des composants Cible incorrecte ou adaptateur de composant manquant Vérifiez que vous utilisez wasm32-wasip2 (non wasm32-wasi ou wasm32-unknown-unknown)

Erreurs d’exécution

Symptôme La cause Réparer
L’opérateur se bloque avec la trace d’appels WASM Paramètres de configuration manquants ou non valides Ajoutez une analyse défensive dans init avec les valeurs par défaut. Consultez les paramètres de configuration du module.
init renvoie false, le flux de données ne démarre pas Échec de la validation de la configuration Vérifiez les journaux de flux de données pour les messages d’erreur. Vérifiez que moduleConfigurations les noms correspondent à votre code.
Le module se charge, mais ne produit aucune sortie process renvoyant des erreurs ou le filtre supprimant tout Ajoutez la journalisation dans process pour suivre le flux des données.
Unexpected input type Le module a reçu une variante incorrecte data-model Ajoutez une vérification de type au début de process, et gérez les variantes inattendues.
Le module fonctionne seul, mais se bloque dans un graphique complexe Configuration manquante lors de la réutilisation entre les nœuds Chaque nœud de graphe a besoin de sa propre moduleConfigurations entrée.

Pièges courants

  • Oubli de --artifact-type sur l’envoi ORAS. Sans cela, l’interface utilisateur de l’expérience des opérations n’affiche pas correctement votre module.
  • name Non correspondant dans moduleConfigurations. Le nom doit être <module>/<operator> (par exemple, module-temperature/filter), correspondant à la section operations de la définition du graphique.
  • Utilisation wasm32-wasi au lieu de wasm32-wasip2. Opérations Azure IoT nécessite la cible WASI Preview 2.
  • Python : travailler en dehors du référentiel d’exemples sans copier le répertoire de schéma. Tous les .wit fichiers doivent être colocalisés car ils se font référence mutuellement.

Problèmes connus

  • Valeurs booléennes dans YAML : les valeurs booléennes doivent être entre guillemets en tant que chaînes pour éviter les erreurs de validation. Par exemple, utilisez "True" et "False" au lieu de true et false.

    Exemple d’erreur lors de l’utilisation de booléens sans guillemets :

    * spec.connections[2].from.arm: Invalid value: "boolean": spec.connections[2].from.arm in body must be of type string: "boolean"
    * spec.connections[2].from.arm: Unsupported value: false: supported values: "False", "True"
    
  • configuration requise du module Python : pour utiliser des modules Python, Opérations Azure IoT doit être déployé avec le répartiteur MQTT configuré pour utiliser le profil mémoire Moyen ou Élevé. Les modules Python ne peuvent pas être chargés lorsque le profil de mémoire est défini sur Low ou Tiny.

  • Minutage du déploiement de module : l’extraction et l’application de modules WASM peuvent prendre un certain temps, généralement environ une minute, en fonction des conditions réseau et de la taille du module.

  • Détails de l’erreur de build : lorsqu'une build échoue, le message d’erreur dans la notification contextuelle peut ne pas fournir suffisamment de détails. Vérifiez la sortie du terminal pour obtenir des informations d’erreur plus spécifiques.

  • Windows compatibilité : sur Windows, la première fois que vous exécutez une application de graphique, vous pouvez rencontrer une erreur « Échec de la commande avec le code de sortie 1 ». Si cette erreur se produit, réessayez l’opération et qu’elle doit fonctionner correctement.

  • Stabilité de l’application hôte : l’environnement d’exécution local peut parfois cesser de fonctionner et nécessiter un redémarrage pour récupérer.

  • Limitations du débogage à distance : Actuellement, vous ne pouvez pas déboguer à distance les modules WASM s'exécutant dans Azure Linux 3.0 en raison de versions LLDB incompatibles.

Procédures de récupération

Réinitialisation de l’extension VS Code : si l’extension VS Code se comporte de façon inattendue, essayez de la désinstaller et de la réinstaller, puis redémarrez VS Code.