Informatie over WEBAssembly-modules (WASM) en grafiekdefinities voor gegevensstroomgrafieken

Gegevensstroomgrafieken in Azure IoT-bewerkingen telemetriegegevens aan de rand verwerken door deze te routeren via een reeks operators, zoals kaarten, filters en vertakkingen. U verpakt uw aangepaste verwerkingslogica als WASM-modules (WebAssembly) en verbindt deze in een grafiekdefinitie, zodat u gegevens kunt transformeren, filteren en verrijken zonder volledige services te schrijven.

In dit artikel worden de operatortypen, het tijdige gegevensstroommodel, moduleconfiguratie, host-API's en het WIT-schema beschreven dat WASM-modules ondersteunt. Als u modules lokaal wilt bouwen, testen en fouten wilt opsporen met de VS Code-extensie of de dataflow-dev CLI, raadpleegt u WASM-modules bouwen voor gegevensstromen.

Operators en modules

Operators zijn de verwerkingseenheden in een gegevensstroomgrafiek. Elk type heeft een specifiek doel:

Operator Purpose Terugbrengtype
Kaart Elk gegevensitem transformeren (bijvoorbeeld temperatuureenheden converteren) DataModel
Filter Items doorgeven of verwijderen op basis van een voorwaarde bool
Branch Items naar twee verschillende paden routeren bool
Verzamelen Items aggregeren binnen tijdvensters DataModel
Samenvoegen Meerdere streams samenvoegen terwijl de volgorde behouden blijft N/A
vertraging Tijdstempels vooruitzetten om de timing te regelen N/A

Een module is het BINAIRE WASM-bestand dat een of meer operators implementeert. Een enkele temperature.wasm module kan bijvoorbeeld zowel een map operator (voor conversie) als een filter operator (voor drempelwaardecontrole) bieden.

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Met deze scheiding kunt u dezelfde module opnieuw gebruiken met verschillende grafiekconfiguraties, versiemodules onafhankelijk en gedrag wijzigen via configuratieparameters zonder opnieuw te bouwen.

Tijdig gegevensstroommodel

Gegevensstroomgrafen bouwen voort op het Timely-gegevensstroom rekenmodel van Microsoft Research's Naiad-project. Elk gegevensitem heeft een tijdstempel voor een hybride logische klok:

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Dit geeft u deterministische verwerking (dezelfde invoer produceert altijd dezelfde uitvoer), exact één keer semantiek en gedistribueerde coördinatie tussen knooppunten. Zie de opslagplaats samples voor het volledige WIT-schema.

Zie WASM-modules bouwen met de VS Code-extensie voor meer informatie over het ontwikkelen van WASM-modules met de VS Code-extensie.

Schrijfoperators

Kaartoperator

Een kaartoperator transformeert elk gegevensitem en retourneert een gewijzigde kopie. In het snelstartvoorbeeld ziet u een basiskaart. Hier volgt een complexer voorbeeld waarin configuratieparameters worden gebruikt:

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Filteroperator

Een filter keert true terug om gegevens door te geven of false om het te verwijderen.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Vertakkingsoperator

Een vertakking routeert gegevens naar twee paden. Geef false voor de eerste arm terug, true voor de tweede.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Moduleconfiguratieparameters

Uw operators kunnen runtimeconfiguratieparameters ontvangen via de init functie. Hiermee kunt u het gedrag aanpassen zonder de module opnieuw op te bouwen.

De init functie ontvangt een ModuleConfiguration struct:

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

De init functie wordt eenmaal aangeroepen wanneer de module wordt geladen. Ga terug true om de verwerking te starten of false om een configuratiefout te signaleren. Als initfalse retourneert, zal de operator geen gegevens verwerken en zal de gegevensstroom een fout loggen.

Belangrijk

Als uw operator afhankelijk is van configuratieparameters (bijvoorbeeld filtergrenzen of drempelwaarden), moet u altijd het geval verwerken waarin deze niet worden opgegeven. Gebruik verstandige standaardwaarden of retourneer false van init. Roep unwrap() niet aan en raak niet in paniek bij ontbrekende parameters, omdat de operator hierdoor tijdens uitvoering vastloopt zonder duidelijke foutmelding.

U definieert de parameters in de sectie van moduleConfigurations de grafiekdefinitie:

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

Het name veld moet overeenkomen met de operatornaam in de grafieksectie operations . Zie WebAssembly-grafiekdefinities configureren voor meer informatie over grafiekdefinitiestructuur.

Modulegrootte en -prestaties

WASM-modules worden uitgevoerd in een sandboxomgeving met beperkte resources. Houd rekening met deze richtlijnen:

  • Minimaliseer afhankelijkheden. Gebruik voor Rust default-features = false aan serde en serde_json om de binaire grootte te verminderen. Vermijd het trekken aan grote kratten.
  • Modulegrootte is belangrijk. Kleinere modules laden sneller en gebruiken minder geheugen. Een typische temperatuurconverter is ~2 MB (Rust-release) of ~5 MB (Python). Release-builds gebruiken voor productie.
  • Voorkom blokkeringsbewerkingen. De process functie moet snel worden voltooid. Zware berekening vertraagt de volledige gegevensstroompijplijn.
  • Gebruik wasm-tools om te bekijken. Voer deze opdracht uit wasm-tools component wit your-module.wasm om te controleren of de module de verwachte interfaces exporteert voordat u naar een register pusht.

Versiebeheer en CI/CD

Gebruik semantische versiebeheer voor uw modules en grafiekdefinities. De gegevensstroomgrafiek verwijst naar artefacten op naam en tag (bijvoorbeeld temperature:1.0.0), zodat u modules kunt bijwerken zonder grafiekdefinities te wijzigen door een nieuwe versie met dezelfde tag te pushen.

Voor geautomatiseerde builds ziet een typische pijplijn er als volgt uit:

  1. Bouw de WASM-module (gebruik de Docker-opbouwfunctie voor consistentie).
  2. Voer deze opdracht uit wasm-tools component wit om geëxporteerde interfaces te controleren.
  3. Voer eenheidstests uit op basis van uw kernlogica. Zie Test WASM-modules voor meer informatie.
  4. Push naar uw register met ORAS en taggen met de buildversie.
  5. (Optioneel) Werk de artefactreferentie van de grafiekdefinitie bij en voer de push uit.

In de gegevensstroomgrafiek worden automatisch nieuwe moduleversies opgehaald die naar dezelfde tag worden gepusht zonder dat een nieuwe implementatie is vereist. Zie Een module bijwerken in een actieve grafiek.

Host-API's

Uw WASM-modules kunnen host-API's gebruiken voor statusbeheer, logboekregistratie en metrische gegevens.

Staatopslag

Gegevens behouden tussen process aanroepen met behulp van het gedistribueerde statusarchief:

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Logging

Gestructureerde logboekregistratie met ernstniveaus:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metrics

Met openTelemetry compatibele metrische gegevens:

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

ONNX-inferentie

Zie OnNX-deductie uitvoeren in WebAssembly-gegevensstroomgrafieken om kleine ONNX-modellen in uw modules in te sluiten en uit te voeren voor in-banddeductie.

WIT-schemareferentie

Alle operators implementeren gestandaardiseerde interfaces die zijn gedefinieerd met WebAssembly Interface Types (WIT). U vindt de volledige schema's in de opslagplaats samples.

Operatorinterfaces

Elke operator heeft een init functie voor configuratie en een process functie voor gegevensverwerking:

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Gegevensmodel

Van processor.wit (wasm-graph:processor@1.1.0):

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Opmerking

De meeste operators werken met de message variant. Controleer aan het begin van uw process-functie op dit type. De payload maakt gebruik van een hostbufferhandgreep (buffer) voor zero-copy leesbewerkingen of bytes (bytes) die eigendom zijn van een module. Aanroep buffer.read() om hostbytes naar het geheugen van uw module te kopiëren.