Comprendre les modules WebAssembly (WASM) et les définitions de graphiques pour les graphiques de flux de données

Les graphiques de flux de données dans Opérations Azure IoT traitent les données de télémétrie à la périphérie en les acheminant via une série d’opérateurs tels que des mappages, des filtres et des branches. Vous empaquetez votre logique de traitement personnalisée en tant que modules WebAssembly (WASM) et les associez dans une définition de graphique, afin de pouvoir transformer, filtrer et enrichir des données sans écrire de services complets.

Cet article explique les types d’opérateurs, le modèle de flux de données en temps opportun, la configuration du module, les API hôtes et le schéma WIT qui sous-tend les modules WASM. Pour générer, tester et déboguer des modules localement avec l’extension VS Code ou l’interface dataflow-dev CLI, consultez Générer des modules WASM pour les flux de données.

Opérateurs et modules

Les opérateurs sont les unités de traitement dans un graphique de flux de données. Chaque type a un but spécifique :

Opérateur Objectif Type de retour
Carte Transformer chaque élément de données (par exemple, convertir des unités de température) DataModel
Filtrer Passer ou supprimer des éléments en fonction d’une condition bool
Branche Acheminer les éléments vers deux chemins d’accès différents bool
Accumuler Agréger les éléments dans les fenêtres de temps DataModel
Concatenate Fusionner plusieurs flux tout en préservant l’ordre N/A
Délai Avancez les horodatages pour contrôler le minutage N/A

Un module est le binaire WASM qui implémente un ou plusieurs opérateurs. Par exemple, un seul temperature.wasm module peut fournir à la fois un map opérateur (pour la conversion) et un filter opérateur (pour la vérification de seuil).

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Cette séparation vous permet de réutiliser le même module avec différentes configurations de graphe, de versionner les modules de manière indépendante et de modifier le comportement par le biais de paramètres de configuration sans reconstruire.

Modèle de flux de données en temps opportun

Les graphiques de flux de données se basent sur le modèle de calcul "Timely dataflow" issu du projet Naiad de Microsoft Research. Chaque élément de données comporte un horodatage d’horloge logique hybride :

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Cela vous donne un traitement déterministe (la même entrée produit toujours la même sortie), la sémantique d'exactitude unique et la coordination distribuée à travers les nœuds. Pour obtenir le schéma WIT complet, consultez le dépôt samples.

Pour savoir comment développer des modules WASM avec l’extension VS Code, consultez Générer des modules WASM avec l’extension VS Code.

Opérateurs d’écriture

Opérateur de mappage

Un opérateur de carte transforme chaque élément de données et retourne une copie modifiée. L’exemple de démarrage rapide montre une carte de base. Voici un exemple plus complexe qui utilise des paramètres de configuration :

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Opérateur de filtre

Un filtre renvoie true pour transmettre des données ou false pour les supprimer.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Opérateur de branche

Une branche achemine les données vers deux chemins. Retournez false pour le premier arm, true pour le second.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Paramètres de configuration du module

Vos opérateurs peuvent recevoir des paramètres de configuration d’exécution via la init fonction. Cela vous permet de personnaliser le comportement sans regénérer le module.

La init fonction reçoit un ModuleConfiguration struct :

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

La init fonction est appelée une fois lorsque le module se charge. Retournez true pour commencer le traitement, ou false pour signaler une erreur de configuration. Si init renvoie false, l'opérateur ne traitera aucune donnée et le flux de données enregistrera une erreur.

Important

Si votre opérateur dépend des paramètres de configuration (par exemple, des limites de filtre ou des valeurs de seuil), gérez toujours le cas où ils ne sont pas fournis. Utilisez les valeurs par défaut sensibles ou le retour false à partir de init. N’appelez pas unwrap() et ne cédez pas à la panique pour les paramètres manquants, car cela fait planter l’opérateur pendant l’exécution sans message d'erreur clair.

Vous définissez les paramètres dans la section de la définition du graphique moduleConfigurations :

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

Le name champ doit correspondre au nom de l’opérateur dans la section du operations graphique. Pour plus d’informations sur la structure de définition de graphique, consultez Configurer les définitions de graphique WebAssembly.

Taille et performances du module

Les modules WASM s’exécutent dans un environnement bac à sable avec des ressources limitées. Gardez ces instructions à l’esprit :

  • Réduisez les dépendances. Pour Rust, utilisez default-features = false sur serde et serde_json pour réduire la taille binaire. Évitez de tirer de grands crates.
  • La taille du module est importante. Les modules plus petits chargent plus rapidement et utilisent moins de mémoire. Un convertisseur de température classique est ~2 Mo (version Rust) ou ~5 Mo (Python). Utilisez les compilations de version pour la production.
  • Évitez les opérations de blocage. La process fonction doit se terminer rapidement. Le calcul lourd retarde l’intégralité du pipeline de flux de données.
  • Utilisez wasm-tools pour inspecter. Exécutez wasm-tools component wit your-module.wasm pour vérifier que votre module exporte les interfaces attendues avant d’envoyer (push) vers un registre.

Contrôle de version et CI/CD

Utilisez le contrôle de version sémantique pour vos modules et définitions de graphe. Le graphique de flux de données fait référence à des artefacts par nom et par balise (par exemple, temperature:1.0.0), afin de pouvoir mettre à jour les modules sans modifier les définitions de graphique en envoyant une nouvelle version avec la même balise.

Pour les builds automatisées, un pipeline classique ressemble à ceci :

  1. Générez le module WASM (utilisez le générateur Docker pour la cohérence).
  2. Exécutez wasm-tools component wit pour vérifier les interfaces exportées.
  3. Exécutez des tests unitaires sur votre logique principale. Pour plus d’informations, consultez Les modules TEST WASM.
  4. Effectuez un envoi vers votre registre avec ORAS, en étiquetant avec la version de build.
  5. (Facultatif) Mettez à jour la référence d’artefact et l’envoi de la définition de graphique.

Le graphique de flux de données récupère automatiquement les nouvelles versions de module envoyées à la même balise sans nécessiter de redéploiement. Consultez Mettre à jour un module dans un graphique en cours d’exécution.

API d’hôte

Vos modules WASM peuvent utiliser des API hôtes pour la gestion de l’état, la journalisation et les métriques.

Magasin d’état

Conserver les données entre les appels process à l'aide de la mémoire d'état distribuée :

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Logging

Journalisation structurée avec niveaux de gravité :

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metrics

Ouvrir des métriques compatibles OpenTelemetry :

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

Inférence ONNX

Pour incorporer et exécuter de petits modèles ONNX à l’intérieur de vos modules pour l’inférence en bande, consultez Exécuter l’inférence ONNX dans les graphiques de flux de données WebAssembly.

Informations de référence sur le schéma WIT

Tous les opérateurs implémentent des interfaces standardisées définies en utilisant WebAssembly Interface Types (WIT). Vous trouverez les schémas complets dans le référentiel samples.

Interfaces d’opérateur

Chaque opérateur a une init fonction pour la configuration et une process fonction pour la gestion des données :

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Modèle de données

À partir de processor.wit (wasm-graph:processor@1.1.0) :

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Note

La plupart des opérateurs fonctionnent avec la message variante. Recherchez ce type au début de votre fonction process. La charge utile utilise soit un handle de mémoire tampon hôte (buffer) pour les lectures sans copie, soit des octets gérés par le module (bytes). Appelez buffer.read() pour copier les octets de l’hôte dans la mémoire de votre module.