WASM-modules bouwen voor gegevensstromen

Met de aangepaste functie voor gegevensverwerking van WebAssembly (WASM) in Azure IoT-bewerkingen kunt u realtime telemetriegegevens verwerken binnen uw Azure IoT-bewerkingen-cluster. Door aangepaste WASM-modules te implementeren, kunt u gegevenstransformaties definiëren en uitvoeren als onderdeel van uw gegevensstroomgrafiek, de HTTP/REST-connector of de MQTT-connector.

In dit artikel wordt beschreven hoe u de extensie Azure IoT-bewerkingen Gegevensstroom VS Code, de dataflow-dev CLI of standaardhulpprogramma's gebruikt die in uw omgeving zijn geïnstalleerd om uw WASM-modules lokaal te ontwikkelen en te testen voordat u ze in uw Azure IoT-bewerkingen-cluster implementeert. U leert het volgende:

  • Voer een grafiektoepassing lokaal uit door een vooraf gemaakte grafiek met voorbeeldgegevens uit te voeren om inzicht te hebben in de basiswerkstroom.
  • Maak aangepaste WASM-modules door nieuwe operators te bouwen in Python en Rust met kaart- en filterfunctionaliteit.

Gebruik de VS Code-extensie voor een interne ontwikkelingslus wanneer u actief operators en grafieken maakt, bijvoorbeeld: code schrijven, bouwen, fouten controleren, fouten opsporen, wijzigingen aanbrengen, de grafiek bijwerken en publiceren.

Gebruik de dataflow-dev CLI voor CI/CD-werkstromen voor grafiekkwaliteit, bijvoorbeeld: bestaande code bouwen, de grafiek uitvoeren, uitvoer testen op bekende goede resultaten en de kwaliteit in de loop van de tijd bewaken.

Gebruik standaardhulpprogramma's die in uw omgeving zijn geïnstalleerd voor scenario's waarin u meer controle wilt over het build- en testproces, of wanneer u moet integreren met andere ontwikkelhulpprogramma's en werkstromen.

In dit artikel wordt beschreven hoe u uw WASM-modules lokaal bouwt en test. Als u ze wilt gebruiken in Azure IoT-bewerkingen gegevensstromen en connectors, moet u ze implementeren in uw Azure IoT-bewerkingen-cluster en ernaar verwijzen in de configuratie van uw grafiek of connector:

Zie voor meer geavanceerde scenario's Stateful WASM-grafieken maken met de statusopslag, Gebruik het schemaregister met WASM-modules, WASM-modules debuggen en WASM-modules testen.

De extensie en het CLI-hulpprogramma worden ondersteund op de volgende platforms:

  • Linux
  • Windows-subsysteem voor Linux (WSL)
  • Windows (zorg ervoor dat u een Windows-shell zoals PowerShell of opdrachtprompt gebruikt wanneer u de extensieopdrachten uitvoert op Windows)

Zie voor meer informatie over grafieken en WASM in Azure IoT-bewerkingen:

Vereiste voorwaarden

Ontwikkelomgeving:

Docker-installatiekopieën:

docker pull mcr.microsoft.com/azureiotoperations/processor-app:1.1.5
docker tag mcr.microsoft.com/azureiotoperations/processor-app:1.1.5 host-app

docker pull mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8
docker tag mcr.microsoft.com/azureiotoperations/devx-runtime:0.1.8 devx

docker pull mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2
docker tag mcr.microsoft.com/azureiotoperations/statestore-cli:0.0.2 statestore-cli

docker pull eclipse-mosquitto

Een grafiektoepassing lokaal uitvoeren

In dit voorbeeld wordt een voorbeeldwerkruimte gebruikt die alle benodigde resources bevat om een grafiektoepassing lokaal te bouwen en uit te voeren met behulp van de VS Code-extensie.

De voorbeeldwerkruimte openen in VS Code

Kloon de opslagplaats Explore IoT Operations als u dat nog niet hebt gedaan.

Open de map samples/wasm in Visual Studio Code door Bestand > Map Openen te selecteren en naar de map samples/wasm te navigeren.

De operators bouwen

Druk op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Alle operators bouwen. Selecteer release als de buildmodus.

Met deze opdracht worden alle operators in de werkruimte gebouwd en worden .wasm bestanden in de operators map gemaakt. U gebruikt de .wasm bestanden om de grafiektoepassing lokaal uit te voeren.

De grafiektoepassing lokaal uitvoeren

Als u de lokale uitvoeringsomgeving wilt starten, drukt u op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Ontwikkelomgeving starten. Selecteer release als de uitvoeringsmodus.

Wanneer de lokale uitvoeringsomgeving wordt uitgevoerd, drukt u op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Application Graph uitvoeren. Selecteer release als de uitvoeringsmodus. Met deze opdracht wordt de grafiektoepassing lokaal uitgevoerd met behulp van de lokale uitvoeringsomgeving met het graph.dataflow.yaml bestand in de werkruimte.

Het leest ook van hostapp.env.list om de omgevingsvariabele TK_CONFIGURATION_PARAMETERS in te stellen voor configuratieparameters van de gegevensstroomoperator.

Wanneer u wordt gevraagd om invoergegevens, selecteert u de data-and-images map in de werkruimte. Deze map bevat de invoergegevensbestanden voor de grafiektoepassing, inclusief temperatuur- en vochtigheidsgegevens en enkele afbeeldingen voor de momentopnamemodule.

Wacht totdat u een VS Code-melding ziet dat de logboeken gereed zijn: Log files for the run can be found at ...\wasm\data-and-images\output\logs.

De uitvoer bevindt zich in de output map onder de data-and-images map. U kunt de output map in de werkruimte openen om de uitvoerbestanden weer te geven. Het .txt bestand met de datum en tijd voor bestandsnaam bevat de verwerkte gegevens en het ziet er als volgt uit:

{"tst":"2025-09-19T04:19:13.530381+0000","topic":"sensors","qos":0,"retain":0,"payloadlen":312,"properties":{"payload-format-indicator":1,"message-expiry-interval":10,"correlation-data":"...","user-properties":{"__ts":"001758255553528:00000:...","__protVer":"1.0","__srcId":"mqtt-source"},"content-type":"application/json"},"payload":{"temperature":[{"count":2,"max":653.888888888889,"min":204.44444444444449,"average":429.16666666666669,"last":204.44444444444449,"unit":"C","overtemp":true}],"humidity":[{"count":3,"max":85,"min":45,"average":69.666666666666671,"last":79}],"object":[{"result":"notebook, notebook computer; sliding door"}]}}

In de uitvoer ziet u dat de grafiektoepassing de invoergegevens heeft verwerkt en de uitvoer heeft gegenereerd. De uitvoer bevat temperatuur- en vochtigheidsgegevens en de objecten die in de afbeeldingen zijn gedetecteerd.

Een nieuwe grafiek maken met aangepaste WASM-modules

In dit scenario ziet u hoe u een nieuwe grafiektoepassing maakt met aangepaste WASM-modules. De grafiektoepassing bestaat uit twee operators: een map operator die temperatuurwaarden van Fahrenheit converteert naar Celsius en een filter operator die berichten filtert met temperatuurwaarden boven 500°C.

Gebruik momenteel geen afbreekstreepjes (-) of onderstrepingstekens (_) in operatornamen. De VS Code-extensie dwingt deze vereiste af, maar als u handmatig modules maakt of hernoemt, veroorzaakt dit problemen. Gebruik eenvoudige alfanumerieke namen voor modules zoals filter, map, stateenrichof schemafilter.

In plaats van een bestaande voorbeeldwerkruimte te gebruiken, maakt u een volledig nieuwe werkruimte. Met dit proces leert u hoe u een nieuwe grafiektoepassing maakt en de operators in Python en Rust programmeert.

Een nieuw grafiektoepassingsproject maken in Python

Druk op Ctrl+Shift+P om het opdrachtenpalet van VS Code te openen en te zoeken naar Azure IoT-bewerkingen: Toepassing maken:

  1. Selecteer voor de map een map waarin u het project wilt maken. U kunt een nieuwe map voor dit project maken.
  2. Voer de my-graph naam in.
  3. Selecteer Python als taal.
  4. Selecteer Kaart als het type.
  5. Voer de map naam in.

U hebt nu een nieuwe VS Code-werkruimte met de basisprojectstructuur en startersbestanden. De startersbestanden bevatten het graph.dataflow.yaml bestand en de broncode van de kaartoperatorsjabloon.

Belangrijk

Als u een Python-module in een geïmplementeerd Azure IoT-bewerkingen-exemplaar wilt gebruiken, moet u het exemplaar implementeren met het brokergeheugenprofiel ingesteld op Medium of High. Als u het geheugenprofiel instelt op Low of Tiny, kan het exemplaar de Python-module niet ophalen.

Voeg Python code toe voor de module kaartoperator

Open het operators/map/map.py bestand en vervang de inhoud door de volgende code om een binnenkomende temperatuurwaarde van Fahrenheit te converteren naar Celsius:

import json
from map_impl import exports
from map_impl import imports
from map_impl.imports import types

class Map(exports.Map):
    def init(self, configuration) -> bool:
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "Init invoked")
        return True

    def process(self, message: types.DataModel) -> types.DataModel:
        # TODO: implement custom logic for map operator
        imports.logger.log(imports.logger.Level.INFO, "module4/map", "processing from python")

        # Ensure the input is of the expected type
        if not isinstance(message, types.DataModel_Message):
            raise ValueError("Unexpected input type: Expected DataModel_Message")

        # Extract and decode the payload
        payload_variant = message.value.payload
        if isinstance(payload_variant, types.BufferOrBytes_Buffer):
            # It's a Buffer handle - read from host
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Buffer")
            payload = payload_variant.value.read()
        elif isinstance(payload_variant, types.BufferOrBytes_Bytes):
            # It's already bytes
            imports.logger.log(imports.logger.Level.INFO, "module4/map", "Reading payload from Bytes")
            payload = payload_variant.value
        else:
            raise ValueError("Unexpected payload type")

        decoded = payload.decode("utf-8")

        # Parse the JSON data
        json_data = json.loads(decoded)

        # Check and update the temperature value
        if "temperature" in json_data and "value" in json_data["temperature"]:
            temp_f = json_data["temperature"]["value"]
            if isinstance(temp_f, int):
                # Convert Fahrenheit to Celsius
                temp_c = round((temp_f - 32) * 5.0 / 9.0)

                # Update the JSON data
                json_data["temperature"]["value"] = temp_c
                json_data["temperature"]["unit"] = "C"

                # Serialize the updated JSON back to bytes
                updated_payload = json.dumps(json_data).encode("utf-8")

                # Update the message payload
                message.value.payload = types.BufferOrBytes_Bytes(value=updated_payload)

        return message

Zorg ervoor dat Docker wordt uitgevoerd. Druk vervolgens op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Alle operators bouwen. Maak een releasemodule .

Het buildproces plaatst het map.wasm bestand voor de map operator in de operators/map/bin/release map.

Rust-code toevoegen voor de filteroperatormodule

Maak een nieuwe operator door op Ctrl+Shift+P te drukken om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Operator maken:

  1. Selecteer Rust als taal.
  2. Selecteer Filter als operatortype.
  3. Voer de filter naam in.

Open het operators/filter/src/lib.rs bestand en vervang de inhoud door de volgende code om waarden te filteren waarbij de temperatuur hoger is dan 500°C:

mod filter_operator {
    use wasm_graph_sdk::macros::filter_operator;
    use serde_json::Value;

    fn filter_init(_configuration: ModuleConfiguration) -> bool {
        // Add code here to process the module init properties and module schemas from the configuration

        true
    }

    #[filter_operator(init = "filter_init")]
    fn filter(input: DataModel) -> Result<bool, Error> {

        // Extract payload from input to process
        let payload = match input {
            DataModel::Message(Message {
                payload: BufferOrBytes::Buffer(buffer),
                ..
            }) => buffer.read(),
            DataModel::Message(Message {
                payload: BufferOrBytes::Bytes(bytes),
                ..
            }) => bytes,
            _ => return Err(Error { message: "Unexpected input type".to_string() }),
        };

        // ... perform filtering logic here and return boolean
        if let Ok(payload_str) = std::str::from_utf8(&payload) {
            if let Ok(json) = serde_json::from_str::<Value>(payload_str) {
                if let Some(temp_c) = json["temperature"]["value"].as_i64() {
                    // Return true if temperature is above 500°C
                    return Ok(temp_c > 500);
                }
            }
        }

        Ok(false)
   }
}

Open het operators/filter/Cargo.toml bestand en voeg de volgende afhankelijkheden toe:

[dependencies]
# ...
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

Zorg ervoor dat Docker wordt uitgevoerd. Druk vervolgens op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Alle operators bouwen. Maak een releasemodule .

Het buildproces plaatst het filter.wasm bestand voor de filter operator in de operators/filter/bin/release map.

De grafiektoepassing lokaal uitvoeren met voorbeeldgegevens

Open het graph.dataflow.yaml bestand en vervang de inhoud door de volgende code:

metadata:
    $schema: "https://www.schemastore.org/aio-wasm-graph-config-1.0.0.json"
    name: "Temperature Monitoring"
    description: "A graph that converts temperature from Fahrenheit to Celsius, if temperature is above 500°C, then sends the processed data to the sink."
    version: "1.0.0"
    vendor: "Microsoft"
moduleRequirements:
    apiVersion: "1.1.0"
    runtimeVersion: "1.1.0"
operations:
  - operationType: source
    name: source
  - operationType: map
    name: map
    module: map
  - operationType: filter
    name: filter
    module: filter
  - operationType: sink
    name: sink
connections:
  - from:
      name: source
    to:
      name: map
  - from:
      name: map
    to:
      name: filter
  - from:
      name: filter
    to:
      name: sink

Kopieer de data map met de voorbeeldgegevens uit de gekloonde opslagplaats explore-iot-operations\samples\wasm\data met voorbeelden naar de huidige werkruimte. De data map bevat drie JSON-bestanden met voorbeeldtemperatuurgegevens voor invoer.

Als u de lokale uitvoeringsomgeving eerder hebt gestopt, drukt u op Ctrl+Shift+P om het opdrachtenpalet te openen en te zoeken naar Azure IoT-bewerkingen: Ontwikkelomgeving starten. Selecteer release als de uitvoeringsmodus.

Druk op Ctrl+Shift+P om het opdrachtenpalet te openen en zoek naar Azure IoT-bewerkingen: Application Graph uitvoeren:

  1. Selecteer het graph.dataflow.yaml grafiekbestand.
  2. Selecteer release als de uitvoeringsmodus.
  3. Selecteer de data map die u naar de werkruimte hebt gekopieerd.

De DevX-container wordt gestart om de grafiek uit te voeren. Het verwerkte resultaat wordt opgeslagen in de data/output map. Het tekstbestand in de output map bevat de verwerkte gegevens met de temperatuur die is geconverteerd naar Celsius en gefilterd op basis van de drempelwaarde.

Zie Deploy WASM-modules en gegevensstroomgrafieken voor meer informatie over het implementeren van uw aangepaste WASM-modules en -grafieken in uw Azure IoT-bewerkingen-exemplaar.

Wanneer u uw WASM-modules hebt gebouwd en uw grafiektoepassing hebt geconfigureerd, kunt u deze implementeren in uw Azure IoT-bewerkingen-cluster en ernaar verwijzen in de configuratie van uw graaf of connector:

Troubleshoot

Buildfouten

Fout Oorzaak Repareren
error[E0463]: can't find crate for std Ontbrekend WASM-doel Voer rustup target add wasm32-wasip2 uit
error: no matching package found voor wasm_graph_sdk Ontbrekend vrachtregister Voeg het [registries] blok toe aan .cargo/config.toml zoals wordt weergegeven in Een grafiektoepassing lokaal uitvoeren
componentize-py KAN WIT-bestanden niet vinden Schemapad verkeerd Gebruik -d de vlag met het volledige pad naar de schemamap. Alle .wit bestanden moeten aanwezig zijn omdat ze naar elkaar verwijzen.
componentize-py versieverschil Bindingen die zijn gegenereerd met een andere versie Verwijder de gegenereerde bindingsdirectory en regenereer met dezelfde componentize-py versie
wasm-tools onderdeelcontrole mislukt Verkeerde doel of ontbrekende onderdeeladapter Zorg ervoor dat u wasm32-wasip2 gebruikt (niet wasm32-wasi of wasm32-unknown-unknown)

Runtime fouten

Symptoom Oorzaak Repareren
Operator loopt vast met WASM-backtrace Ontbrekende of ongeldige configuratieparameters Voeg defensieve parsering toe init met standaardwaarden. Zie moduleconfiguratieparameters.
init retourneert false, de gegevensstroom wordt niet gestart Configuratievalidatie mislukt Controleer de gegevensstroomlogboeken op foutberichten. Controleer of moduleConfigurations de namen overeenkomen met uw code.
Module wordt geladen, maar produceert geen uitvoer process retourneert fouten of de filter laat alles vallen Voeg logboekregistratie toe process om de gegevensstroom te traceren.
Unexpected input type Module heeft een verkeerde data-model variant ontvangen Voeg een typecontrole toe aan het begin van process en verwerk onverwachte varianten.
Module werkt alleen, maar loopt vast in complexe grafiek Ontbrekende configuratie wanneer deze opnieuw wordt gebruikt op knooppunten Elk grafiekknooppunt heeft een eigen moduleConfigurations vermelding nodig.

Veelvoorkomende valkuilen

  • Vergeet niet de ORAS-push uit te voeren. Zonder deze functie wordt uw module niet correct weergegeven in de gebruikersinterface voor bewerkingen.
  • Komt niet overeen name in moduleConfigurations. De naam moet <module>/<operator> (bijvoorbeeld module-temperature/filter) overeenkomen met de sectie van operations de grafiekdefinitie.
  • Gebruiken wasm32-wasi in plaats van wasm32-wasip2. Azure IoT-bewerkingen vereist het WASI Preview 2-doel.
  • Python: werken buiten de voorbeeldrepository zonder het kopiëren van de schema-map. Alle .wit bestanden moeten zich naast elkaar bevinden omdat ze naar elkaar verwijzen.

Bekende problemen

  • Booleaanse waarden in YAML: Booleaanse waarden moeten worden geciteerd als tekenreeksen om validatiefouten te voorkomen. Gebruik bijvoorbeeld "True" en "False" in plaats van true en false.

    Voorbeeldfout bij het gebruik van niet-geciteerde booleaanse waarden:

    * spec.connections[2].from.arm: Invalid value: "boolean": spec.connections[2].from.arm in body must be of type string: "boolean"
    * spec.connections[2].from.arm: Unsupported value: false: supported values: "False", "True"
    
  • Python modulevereisten: als u Python modules wilt gebruiken, moet Azure IoT-bewerkingen worden geïmplementeerd met de MQTT-broker die is geconfigureerd voor het gebruik van het Medium of High geheugenprofiel. Python modules kunnen niet worden opgehaald wanneer het geheugenprofiel is ingesteld op Low of Tiny.

  • Tijdsinstellingen voor moduleimplementatie: het ophalen en toepassen van WASM-modules kan enige tijd duren, meestal ongeveer een minuut, afhankelijk van de netwerkomstandigheden en de modulegrootte.

  • Buildfoutdetails: wanneer een build mislukt, biedt het foutbericht in de pop-upmelding mogelijk onvoldoende details. Controleer de terminaluitvoer op specifiekere foutinformatie.

  • Windows compatibiliteit: Windows De eerste keer dat u een grafiektoepassing uitvoert, treedt er mogelijk een fout op:'Opdracht is mislukt met afsluitcode 1'. Als deze fout optreedt, voert u de bewerking opnieuw uit en werkt deze correct.

  • Stabiliteit van host-apps: de lokale uitvoeringsomgeving werkt mogelijk af en toe niet meer en moet opnieuw worden opgestart om te herstellen.

  • Opsporingsbeperkingen: Momenteel kunt u geen fouten opsporen in WASM-modules die worden uitgevoerd in Azure Linux 3.0 vanwege incompatibele LLDB-versies.

Herstelprocedures

VS Code-extensie opnieuw instellen: als de VS Code-extensie onverwacht werkt, probeert u de extensie te verwijderen en opnieuw te installeren, start u VS Code opnieuw.