Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les graphiques de flux de données dans Opérations Azure IoT traitent les données de télémétrie à la périphérie en les acheminant via une série d’opérateurs tels que des mappages, des filtres et des branches. Vous empaquetez votre logique de traitement personnalisée en tant que modules WebAssembly (WASM) et les associez dans une définition de graphique, afin de pouvoir transformer, filtrer et enrichir des données sans écrire de services complets.
Cet article explique les types d’opérateurs, le modèle de flux de données en temps opportun, la configuration du module, les API hôtes et le schéma WIT qui sous-tend les modules WASM. Pour générer, tester et déboguer des modules localement avec l’extension VS Code ou l’interface dataflow-dev CLI, consultez Générer des modules WASM pour les flux de données.
Opérateurs et modules
Les opérateurs sont les unités de traitement dans un graphique de flux de données. Chaque type a un but spécifique :
| Opérateur | Objectif | Type de retour |
|---|---|---|
| Carte | Transformer chaque élément de données (par exemple, convertir des unités de température) | DataModel |
| Filtrer | Passer ou supprimer des éléments en fonction d’une condition | bool |
| Branche | Acheminer les éléments vers deux chemins d’accès différents | bool |
| Accumuler | Agréger les éléments dans les fenêtres de temps | DataModel |
| Concatenate | Fusionner plusieurs flux tout en préservant l’ordre | N/A |
| Délai | Avancez les horodatages pour contrôler le minutage | N/A |
Un module est le binaire WASM qui implémente un ou plusieurs opérateurs. Par exemple, un seul temperature.wasm module peut fournir à la fois un map opérateur (pour la conversion) et un filter opérateur (pour la vérification de seuil).
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Cette séparation vous permet de réutiliser le même module avec différentes configurations de graphe, de versionner les modules de manière indépendante et de modifier le comportement par le biais de paramètres de configuration sans reconstruire.
Modèle de flux de données en temps opportun
Les graphiques de flux de données se basent sur le modèle de calcul "Timely dataflow" issu du projet Naiad de Microsoft Research. Chaque élément de données comporte un horodatage d’horloge logique hybride :
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
Cela vous donne un traitement déterministe (la même entrée produit toujours la même sortie), la sémantique d'exactitude unique et la coordination distribuée à travers les nœuds. Pour obtenir le schéma WIT complet, consultez le dépôt samples.
Pour savoir comment développer des modules WASM avec l’extension VS Code, consultez Générer des modules WASM avec l’extension VS Code.
Opérateurs d’écriture
Opérateur de mappage
Un opérateur de carte transforme chaque élément de données et retourne une copie modifiée. L’exemple de démarrage rapide montre une carte de base. Voici un exemple plus complexe qui utilise des paramètres de configuration :
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Opérateur de filtre
Un filtre renvoie true pour transmettre des données ou false pour les supprimer.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Opérateur de branche
Une branche achemine les données vers deux chemins. Retournez false pour le premier arm, true pour le second.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Paramètres de configuration du module
Vos opérateurs peuvent recevoir des paramètres de configuration d’exécution via la init fonction. Cela vous permet de personnaliser le comportement sans regénérer le module.
La init fonction reçoit un ModuleConfiguration struct :
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
La init fonction est appelée une fois lorsque le module se charge. Retournez true pour commencer le traitement, ou false pour signaler une erreur de configuration. Si init renvoie false, l'opérateur ne traitera aucune donnée et le flux de données enregistrera une erreur.
Important
Si votre opérateur dépend des paramètres de configuration (par exemple, des limites de filtre ou des valeurs de seuil), gérez toujours le cas où ils ne sont pas fournis. Utilisez les valeurs par défaut sensibles ou le retour false à partir de init. N’appelez pas unwrap() et ne cédez pas à la panique pour les paramètres manquants, car cela fait planter l’opérateur pendant l’exécution sans message d'erreur clair.
Vous définissez les paramètres dans la section de la définition du graphique moduleConfigurations :
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
Le name champ doit correspondre au nom de l’opérateur dans la section du operations graphique. Pour plus d’informations sur la structure de définition de graphique, consultez Configurer les définitions de graphique WebAssembly.
Taille et performances du module
Les modules WASM s’exécutent dans un environnement bac à sable avec des ressources limitées. Gardez ces instructions à l’esprit :
- Réduisez les dépendances. Pour Rust, utilisez
default-features = falsesurserdeetserde_jsonpour réduire la taille binaire. Évitez de tirer de grands crates. - La taille du module est importante. Les modules plus petits chargent plus rapidement et utilisent moins de mémoire. Un convertisseur de température classique est ~2 Mo (version Rust) ou ~5 Mo (Python). Utilisez les compilations de version pour la production.
- Évitez les opérations de blocage. La
processfonction doit se terminer rapidement. Le calcul lourd retarde l’intégralité du pipeline de flux de données. - Utilisez
wasm-toolspour inspecter. Exécutezwasm-tools component wit your-module.wasmpour vérifier que votre module exporte les interfaces attendues avant d’envoyer (push) vers un registre.
Contrôle de version et CI/CD
Utilisez le contrôle de version sémantique pour vos modules et définitions de graphe. Le graphique de flux de données fait référence à des artefacts par nom et par balise (par exemple, temperature:1.0.0), afin de pouvoir mettre à jour les modules sans modifier les définitions de graphique en envoyant une nouvelle version avec la même balise.
Pour les builds automatisées, un pipeline classique ressemble à ceci :
- Générez le module WASM (utilisez le générateur Docker pour la cohérence).
- Exécutez
wasm-tools component witpour vérifier les interfaces exportées. - Exécutez des tests unitaires sur votre logique principale. Pour plus d’informations, consultez Les modules TEST WASM.
- Effectuez un envoi vers votre registre avec ORAS, en étiquetant avec la version de build.
- (Facultatif) Mettez à jour la référence d’artefact et l’envoi de la définition de graphique.
Le graphique de flux de données récupère automatiquement les nouvelles versions de module envoyées à la même balise sans nécessiter de redéploiement. Consultez Mettre à jour un module dans un graphique en cours d’exécution.
API d’hôte
Vos modules WASM peuvent utiliser des API hôtes pour la gestion de l’état, la journalisation et les métriques.
Magasin d’état
Conserver les données entre les appels process à l'aide de la mémoire d'état distribuée :
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Logging
Journalisation structurée avec niveaux de gravité :
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metrics
Ouvrir des métriques compatibles OpenTelemetry :
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
Inférence ONNX
Pour incorporer et exécuter de petits modèles ONNX à l’intérieur de vos modules pour l’inférence en bande, consultez Exécuter l’inférence ONNX dans les graphiques de flux de données WebAssembly.
Informations de référence sur le schéma WIT
Tous les opérateurs implémentent des interfaces standardisées définies en utilisant WebAssembly Interface Types (WIT). Vous trouverez les schémas complets dans le référentiel samples.
Interfaces d’opérateur
Chaque opérateur a une init fonction pour la configuration et une process fonction pour la gestion des données :
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Modèle de données
À partir de processor.wit (wasm-graph:processor@1.1.0) :
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Note
La plupart des opérateurs fonctionnent avec la message variante. Recherchez ce type au début de votre fonction process. La charge utile utilise soit un handle de mémoire tampon hôte (buffer) pour les lectures sans copie, soit des octets gérés par le module (bytes). Appelez buffer.read() pour copier les octets de l’hôte dans la mémoire de votre module.
Contenu connexe
- Générer des modules WASM pour les flux de données
- Créer des graphiques WASM à états avec le stockage d'états
- Utiliser le Registre de schémas avec des modules WASM
- Débogage des modules WASM
- Tester les modules WASM
- Configurer des définitions de graphiques
- Déployer des définitions de graphiques
- Inférence ONNX dans les modules WASM
- Utiliser WASM dans les graphiques de flux de données