Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Dataflödesdiagram i Azure IoT Operations bearbeta telemetridata vid gränsen genom att dirigera dem via en serie operatorer som kartor, filter och grenar. Du paketera din anpassade bearbetningslogik som WebAssembly-moduler (WASM) och koppla ihop dem i en grafdefinition, så att du kan transformera, filtrera och berika data utan att skriva fullständiga tjänster.
Den här artikeln beskriver operatortyperna, dataflödesmodellen i rätt tid, modulkonfigurationen, värd-API:er och WIT-schemat som ligger till grund för WASM-moduler. Information om hur du skapar, testar och felsöker moduler lokalt med VS Code-tillägget eller dataflow-dev CLI finns i Skapa WASM-moduler för dataflöden.
Operatorer och moduler
Operatorer är bearbetningsenheterna i ett dataflödesdiagram. Varje typ har ett specifikt syfte:
| Operatör | Avsikt | Returtyp |
|---|---|---|
| Karta | Transformera varje dataobjekt (till exempel konvertera temperaturenheter) | DataModel |
| Filter | Skicka eller ta bort objekt utifrån ett villkor | bool |
| Filial | Dirigera objekt till två olika sökvägar | bool |
| Ackumulera | Aggregera objekt inom tidsfönster | DataModel |
| Concatenate | Sammanfoga flera strömmar samtidigt som ordningen bevaras | Inte tillämpligt |
| Försening | Flytta fram tidsstämplar för att kontrollera tidsinställningen | Inte tillämpligt |
En modul är WASM-binärfilen som implementerar en eller flera operatorer. En enskild temperature.wasm modul kan till exempel tillhandahålla både en map operator (för konvertering) och en filter operator (för tröskelvärdeskontroll).
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Med den här separationen kan du återanvända samma modul med olika grafkonfigurationer, versionsmoduler oberoende av varandra och ändra beteendet genom konfigurationsparametrar utan att återskapa den.
Tidsrelaterad dataflödesmodell
Dataflödesdiagram bygger på den beräkningsmodellen för Timely dataflow från Microsoft Researchs Naiad-projekt. Varje datapunkt har en tidsstämpel med hybridlogisk klocka:
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
Detta ger dig deterministisk bearbetning (samma indata ger alltid samma utdata), exakt en gång semantik och distribuerad samordning mellan noder. Det fullständiga WIT-schemat finns i lagringsplatsen samples.
Information om hur du utvecklar WASM-moduler med VS Code-tillägget finns i Skapa WASM-moduler med VS Code-tillägget.
Skriv operatorer
Kartoperator
En kartoperator transformerar varje dataobjekt och returnerar en ändrad kopia. I snabbstartsexemplet visas en grundläggande karta. Här är ett mer komplext exempel som använder konfigurationsparametrar:
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Filteroperatorn
Ett filter returnerar true för att låta data passera eller false för att bortse från det.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Förgreningoperator
En gren dirigerar data till två sökvägar. Gå tillbaka false för den första armen, true för den andra.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Modulkonfigurationsparametrar
Dina operatorer kan ta emot körningskonfigurationsparametrar via init funktionen. På så sätt kan du anpassa beteendet utan att återskapa modulen.
Funktionen init tar emot en ModuleConfiguration struktur:
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
Funktionen init anropas en gång när modulen läses in. Återgå true till att börja bearbeta eller false för att signalera ett konfigurationsfel. Om init returnerar falsebearbetar operatorn inga data och dataflödet loggar ett fel.
Viktigt!
Om din operatör är beroende av konfigurationsparametrar (till exempel filtergränser eller tröskelvärden) hanterar du alltid det fall där de inte tillhandahålls. Använd lämpliga standardvärden eller returnera false från init. Anropa inte unwrap() eller oroa dig för saknade parametrar, eftersom detta kraschar operatorn vid körning utan något tydligt felmeddelande.
Du definierar parametrarna i grafdefinitionens moduleConfigurations avsnitt:
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
Fältet name måste matcha operatornamnet i diagrammets operations avsnitt. Mer information om diagramdefinitionsstruktur finns i Konfigurera WebAssembly-grafdefinitioner.
Modulstorlek och prestanda
WASM-moduler körs i en sandbox-miljö med begränsade resurser. Tänk på följande riktlinjer:
- Minimera beroenden. För Rust använder du
default-features = falsepåserdeochserde_jsonför att minska binär storlek. Undvik att dra stora lådor. - Modulstorleken är viktig. Mindre moduler läses in snabbare och använder mindre minne. En typisk temperaturkonverterare är ~2 MB (Rust-version) eller ~5 MB (Python). Använd släppversioner för produktion.
- Undvik blockeringsåtgärder. Funktionen
processbör slutföras snabbt. Stora beräkningar fördröjer hela dataflödespipelinen. - Använd
wasm-toolsför att inspektera. Körwasm-tools component wit your-module.wasmför att verifiera att modulen exporterar de förväntade gränssnitten innan du skickar till ett register.
Versionshantering och CI/CD
Använd semantisk versionshantering för dina moduler och grafdefinitioner. Dataflödesdiagrammet refererar till artefakter efter namn och tagg (till exempel temperature:1.0.0), så att du kan uppdatera moduler utan att ändra diagramdefinitioner genom att skicka en ny version med samma tagg.
För automatiserade versioner ser en typisk pipeline ut så här:
- Skapa WASM-modulen (använd Docker Builder för konsekvens).
- Kör
wasm-tools component witför att verifiera exporterade gränssnitt. - Kör enhetstester mot din kärnlogik. Mer information finns i Testa WASM-moduler.
- Skicka till din register med ORAS och tagga med buildversionen.
- (Valfritt) Uppdatera artefaktreferensen i grafdefinitionen och pusha.
Dataflödesdiagrammet hämtar automatiskt nya modulversioner som skickas till samma tagg utan att en omdistribution krävs. Se Uppdatera en modul i ett diagram som körs.
Värd-API:er
Dina WASM-moduler kan använda värd-API:er för tillståndshantering, loggning och mått.
Tillståndslager
Spara data mellan process anrop med hjälp av den distribuerade tillståndslagringen.
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Loggar
Strukturerad loggning med allvarlighetsnivåer:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metrics
OpenTelemetry-kompatibla mått:
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
ONNX-slutsatsdragning
Information om hur du bäddar in och kör små ONNX-modeller i dina moduler för in-band-slutsatsdragning finns i Köra ONNX-slutsatsdragning i WebAssembly-dataflödesdiagram.
WIT-schemareferens
Alla operatorer implementerar standardiserade gränssnitt som definierats med hjälp av WebAssembly Interface Types (WIT). Du hittar de fullständiga schemana på lagringsplatsen samples.
Operatorgränssnitt
Varje operator har en init funktion för konfiguration och en process funktion för datahantering:
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Datamodell
Från processor.wit (wasm-graph:processor@1.1.0):
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Anmärkning
De flesta operatorer arbetar med varianten message . Kontrollera den här typen i början av din process-funktion. Nyttolasten använder antingen ett värdbufferthandtag (buffer) för läsningar utan kopiering eller modulägda byte (bytes). Anropa buffer.read() för att kopiera värdbyte till modulens minne.