Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gegevensstroomgrafieken in Azure IoT-bewerkingen telemetriegegevens aan de rand verwerken door deze te routeren via een reeks operators, zoals kaarten, filters en vertakkingen. U verpakt uw aangepaste verwerkingslogica als WASM-modules (WebAssembly) en verbindt deze in een grafiekdefinitie, zodat u gegevens kunt transformeren, filteren en verrijken zonder volledige services te schrijven.
In dit artikel worden de operatortypen, het tijdige gegevensstroommodel, moduleconfiguratie, host-API's en het WIT-schema beschreven dat WASM-modules ondersteunt. Als u modules lokaal wilt bouwen, testen en fouten wilt opsporen met de VS Code-extensie of de dataflow-dev CLI, raadpleegt u WASM-modules bouwen voor gegevensstromen.
Operators en modules
Operators zijn de verwerkingseenheden in een gegevensstroomgrafiek. Elk type heeft een specifiek doel:
| Operator | Purpose | Terugbrengtype |
|---|---|---|
| Kaart | Elk gegevensitem transformeren (bijvoorbeeld temperatuureenheden converteren) | DataModel |
| Filter | Items doorgeven of verwijderen op basis van een voorwaarde | bool |
| Branch | Items naar twee verschillende paden routeren | bool |
| Verzamelen | Items aggregeren binnen tijdvensters | DataModel |
| Samenvoegen | Meerdere streams samenvoegen terwijl de volgorde behouden blijft | N/A |
| vertraging | Tijdstempels vooruitzetten om de timing te regelen | N/A |
Een module is het BINAIRE WASM-bestand dat een of meer operators implementeert. Een enkele temperature.wasm module kan bijvoorbeeld zowel een map operator (voor conversie) als een filter operator (voor drempelwaardecontrole) bieden.
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Met deze scheiding kunt u dezelfde module opnieuw gebruiken met verschillende grafiekconfiguraties, versiemodules onafhankelijk en gedrag wijzigen via configuratieparameters zonder opnieuw te bouwen.
Tijdig gegevensstroommodel
Gegevensstroomgrafen bouwen voort op het Timely-gegevensstroom rekenmodel van Microsoft Research's Naiad-project. Elk gegevensitem heeft een tijdstempel voor een hybride logische klok:
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
Dit geeft u deterministische verwerking (dezelfde invoer produceert altijd dezelfde uitvoer), exact één keer semantiek en gedistribueerde coördinatie tussen knooppunten. Zie de opslagplaats samples voor het volledige WIT-schema.
Schrijfoperators
Kaartoperator
Een kaartoperator transformeert elk gegevensitem en retourneert een gewijzigde kopie. In het snelstartvoorbeeld ziet u een basiskaart. Hier volgt een complexer voorbeeld waarin configuratieparameters worden gebruikt:
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Filteroperator
Een filter keert true terug om gegevens door te geven of false om het te verwijderen.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Vertakkingsoperator
Een vertakking routeert gegevens naar twee paden. Geef false voor de eerste arm terug, true voor de tweede.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Moduleconfiguratieparameters
Uw operators kunnen runtimeconfiguratieparameters ontvangen via de init functie. Hiermee kunt u het gedrag aanpassen zonder de module opnieuw op te bouwen.
De init functie ontvangt een ModuleConfiguration struct:
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
De init functie wordt eenmaal aangeroepen wanneer de module wordt geladen. Ga terug true om de verwerking te starten of false om een configuratiefout te signaleren. Als initfalse retourneert, zal de operator geen gegevens verwerken en zal de gegevensstroom een fout loggen.
Belangrijk
Als uw operator afhankelijk is van configuratieparameters (bijvoorbeeld filtergrenzen of drempelwaarden), moet u altijd het geval verwerken waarin deze niet worden opgegeven. Gebruik verstandige standaardwaarden of retourneer false van init. Roep unwrap() niet aan en raak niet in paniek bij ontbrekende parameters, omdat de operator hierdoor tijdens uitvoering vastloopt zonder duidelijke foutmelding.
U definieert de parameters in de sectie van moduleConfigurations de grafiekdefinitie:
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
Het name veld moet overeenkomen met de operatornaam in de grafieksectie operations . Zie WebAssembly-grafiekdefinities configureren voor meer informatie over grafiekdefinitiestructuur.
Modulegrootte en -prestaties
WASM-modules worden uitgevoerd in een sandboxomgeving met beperkte resources. Houd rekening met deze richtlijnen:
- Minimaliseer afhankelijkheden. Gebruik voor Rust
default-features = falseaanserdeenserde_jsonom de binaire grootte te verminderen. Vermijd het trekken aan grote kratten. - Modulegrootte is belangrijk. Kleinere modules laden sneller en gebruiken minder geheugen. Een typische temperatuurconverter is ~2 MB (Rust-release) of ~5 MB (Python). Release-builds gebruiken voor productie.
- Voorkom blokkeringsbewerkingen. De
processfunctie moet snel worden voltooid. Zware berekening vertraagt de volledige gegevensstroompijplijn. - Gebruik
wasm-toolsom te bekijken. Voer deze opdracht uitwasm-tools component wit your-module.wasmom te controleren of de module de verwachte interfaces exporteert voordat u naar een register pusht.
Versiebeheer en CI/CD
Gebruik semantische versiebeheer voor uw modules en grafiekdefinities. De gegevensstroomgrafiek verwijst naar artefacten op naam en tag (bijvoorbeeld temperature:1.0.0), zodat u modules kunt bijwerken zonder grafiekdefinities te wijzigen door een nieuwe versie met dezelfde tag te pushen.
Voor geautomatiseerde builds ziet een typische pijplijn er als volgt uit:
- Bouw de WASM-module (gebruik de Docker-opbouwfunctie voor consistentie).
- Voer deze opdracht uit
wasm-tools component witom geëxporteerde interfaces te controleren. - Voer eenheidstests uit op basis van uw kernlogica. Zie Test WASM-modules voor meer informatie.
- Push naar uw register met ORAS en taggen met de buildversie.
- (Optioneel) Werk de artefactreferentie van de grafiekdefinitie bij en voer de push uit.
In de gegevensstroomgrafiek worden automatisch nieuwe moduleversies opgehaald die naar dezelfde tag worden gepusht zonder dat een nieuwe implementatie is vereist. Zie Een module bijwerken in een actieve grafiek.
Host-API's
Uw WASM-modules kunnen host-API's gebruiken voor statusbeheer, logboekregistratie en metrische gegevens.
Staatopslag
Gegevens behouden tussen process aanroepen met behulp van het gedistribueerde statusarchief:
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Logging
Gestructureerde logboekregistratie met ernstniveaus:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metrics
Met openTelemetry compatibele metrische gegevens:
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
ONNX-inferentie
Zie OnNX-deductie uitvoeren in WebAssembly-gegevensstroomgrafieken om kleine ONNX-modellen in uw modules in te sluiten en uit te voeren voor in-banddeductie.
WIT-schemareferentie
Alle operators implementeren gestandaardiseerde interfaces die zijn gedefinieerd met WebAssembly Interface Types (WIT). U vindt de volledige schema's in de opslagplaats samples.
Operatorinterfaces
Elke operator heeft een init functie voor configuratie en een process functie voor gegevensverwerking:
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Gegevensmodel
Van processor.wit (wasm-graph:processor@1.1.0):
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Opmerking
De meeste operators werken met de message variant. Controleer aan het begin van uw process-functie op dit type. De payload maakt gebruik van een hostbufferhandgreep (buffer) voor zero-copy leesbewerkingen of bytes (bytes) die eigendom zijn van een module. Aanroep buffer.read() om hostbytes naar het geheugen van uw module te kopiëren.
Verwante inhoud
- WASM-modules bouwen voor gegevensstromen
- Stateful WASM-grafieken maken met de statusopslag
- Schemaregister gebruiken met WASM-modules
- Fouten opsporen in WASM-modules
- WASM-modules testen
- Grafiekdefinities configureren
- Grafiekdefinities implementeren
- ONNX-inference in WASM-modules
- WASM gebruiken in gegevensstroomgrafieken