Förstå WASM-moduler (WebAssembly) och diagramdefinitioner för dataflödesdiagram

Dataflödesdiagram i Azure IoT Operations bearbeta telemetridata vid gränsen genom att dirigera dem via en serie operatorer som kartor, filter och grenar. Du paketera din anpassade bearbetningslogik som WebAssembly-moduler (WASM) och koppla ihop dem i en grafdefinition, så att du kan transformera, filtrera och berika data utan att skriva fullständiga tjänster.

Den här artikeln beskriver operatortyperna, dataflödesmodellen i rätt tid, modulkonfigurationen, värd-API:er och WIT-schemat som ligger till grund för WASM-moduler. Information om hur du skapar, testar och felsöker moduler lokalt med VS Code-tillägget eller dataflow-dev CLI finns i Skapa WASM-moduler för dataflöden.

Operatorer och moduler

Operatorer är bearbetningsenheterna i ett dataflödesdiagram. Varje typ har ett specifikt syfte:

Operatör Avsikt Returtyp
Karta Transformera varje dataobjekt (till exempel konvertera temperaturenheter) DataModel
Filter Skicka eller ta bort objekt utifrån ett villkor bool
Filial Dirigera objekt till två olika sökvägar bool
Ackumulera Aggregera objekt inom tidsfönster DataModel
Concatenate Sammanfoga flera strömmar samtidigt som ordningen bevaras Inte tillämpligt
Försening Flytta fram tidsstämplar för att kontrollera tidsinställningen Inte tillämpligt

En modul är WASM-binärfilen som implementerar en eller flera operatorer. En enskild temperature.wasm modul kan till exempel tillhandahålla både en map operator (för konvertering) och en filter operator (för tröskelvärdeskontroll).

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Med den här separationen kan du återanvända samma modul med olika grafkonfigurationer, versionsmoduler oberoende av varandra och ändra beteendet genom konfigurationsparametrar utan att återskapa den.

Tidsrelaterad dataflödesmodell

Dataflödesdiagram bygger på den beräkningsmodellen för Timely dataflow från Microsoft Researchs Naiad-projekt. Varje datapunkt har en tidsstämpel med hybridlogisk klocka:

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Detta ger dig deterministisk bearbetning (samma indata ger alltid samma utdata), exakt en gång semantik och distribuerad samordning mellan noder. Det fullständiga WIT-schemat finns i lagringsplatsen samples.

Information om hur du utvecklar WASM-moduler med VS Code-tillägget finns i Skapa WASM-moduler med VS Code-tillägget.

Skriv operatorer

Kartoperator

En kartoperator transformerar varje dataobjekt och returnerar en ändrad kopia. I snabbstartsexemplet visas en grundläggande karta. Här är ett mer komplext exempel som använder konfigurationsparametrar:

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Filteroperatorn

Ett filter returnerar true för att låta data passera eller false för att bortse från det.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Förgreningoperator

En gren dirigerar data till två sökvägar. Gå tillbaka false för den första armen, true för den andra.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Modulkonfigurationsparametrar

Dina operatorer kan ta emot körningskonfigurationsparametrar via init funktionen. På så sätt kan du anpassa beteendet utan att återskapa modulen.

Funktionen init tar emot en ModuleConfiguration struktur:

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

Funktionen init anropas en gång när modulen läses in. Återgå true till att börja bearbeta eller false för att signalera ett konfigurationsfel. Om init returnerar falsebearbetar operatorn inga data och dataflödet loggar ett fel.

Viktigt!

Om din operatör är beroende av konfigurationsparametrar (till exempel filtergränser eller tröskelvärden) hanterar du alltid det fall där de inte tillhandahålls. Använd lämpliga standardvärden eller returnera false från init. Anropa inte unwrap() eller oroa dig för saknade parametrar, eftersom detta kraschar operatorn vid körning utan något tydligt felmeddelande.

Du definierar parametrarna i grafdefinitionens moduleConfigurations avsnitt:

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

Fältet name måste matcha operatornamnet i diagrammets operations avsnitt. Mer information om diagramdefinitionsstruktur finns i Konfigurera WebAssembly-grafdefinitioner.

Modulstorlek och prestanda

WASM-moduler körs i en sandbox-miljö med begränsade resurser. Tänk på följande riktlinjer:

  • Minimera beroenden. För Rust använder du default-features = falseserde och serde_json för att minska binär storlek. Undvik att dra stora lådor.
  • Modulstorleken är viktig. Mindre moduler läses in snabbare och använder mindre minne. En typisk temperaturkonverterare är ~2 MB (Rust-version) eller ~5 MB (Python). Använd släppversioner för produktion.
  • Undvik blockeringsåtgärder. Funktionen process bör slutföras snabbt. Stora beräkningar fördröjer hela dataflödespipelinen.
  • Använd wasm-tools för att inspektera. Kör wasm-tools component wit your-module.wasm för att verifiera att modulen exporterar de förväntade gränssnitten innan du skickar till ett register.

Versionshantering och CI/CD

Använd semantisk versionshantering för dina moduler och grafdefinitioner. Dataflödesdiagrammet refererar till artefakter efter namn och tagg (till exempel temperature:1.0.0), så att du kan uppdatera moduler utan att ändra diagramdefinitioner genom att skicka en ny version med samma tagg.

För automatiserade versioner ser en typisk pipeline ut så här:

  1. Skapa WASM-modulen (använd Docker Builder för konsekvens).
  2. Kör wasm-tools component wit för att verifiera exporterade gränssnitt.
  3. Kör enhetstester mot din kärnlogik. Mer information finns i Testa WASM-moduler.
  4. Skicka till din register med ORAS och tagga med buildversionen.
  5. (Valfritt) Uppdatera artefaktreferensen i grafdefinitionen och pusha.

Dataflödesdiagrammet hämtar automatiskt nya modulversioner som skickas till samma tagg utan att en omdistribution krävs. Se Uppdatera en modul i ett diagram som körs.

Värd-API:er

Dina WASM-moduler kan använda värd-API:er för tillståndshantering, loggning och mått.

Tillståndslager

Spara data mellan process anrop med hjälp av den distribuerade tillståndslagringen.

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Loggar

Strukturerad loggning med allvarlighetsnivåer:

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metrics

OpenTelemetry-kompatibla mått:

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

ONNX-slutsatsdragning

Information om hur du bäddar in och kör små ONNX-modeller i dina moduler för in-band-slutsatsdragning finns i Köra ONNX-slutsatsdragning i WebAssembly-dataflödesdiagram.

WIT-schemareferens

Alla operatorer implementerar standardiserade gränssnitt som definierats med hjälp av WebAssembly Interface Types (WIT). Du hittar de fullständiga schemana på lagringsplatsen samples.

Operatorgränssnitt

Varje operator har en init funktion för konfiguration och en process funktion för datahantering:

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Datamodell

Från processor.wit (wasm-graph:processor@1.1.0):

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Anmärkning

De flesta operatorer arbetar med varianten message . Kontrollera den här typen i början av din process-funktion. Nyttolasten använder antingen ett värdbufferthandtag (buffer) för läsningar utan kopiering eller modulägda byte (bytes). Anropa buffer.read() för att kopiera värdbyte till modulens minne.