Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This article shows you how to develop custom WebAssembly (WASM) modules for Azure IoT Operations data flow graphs. Build a module in Rust or Python, push it to a registry, deploy it on your cluster, and verify data flows through it end to end.
Important
Data flow graphs currently only support MQTT, Kafka, and OpenTelemetry endpoints. Other endpoint types like Azure Data Lake, Microsoft Fabric OneLake, Azure Data Explorer, and local storage aren't supported. For more information, see Known issues.
Quickstart: build, deploy, and verify a WASM module
This section walks you through the complete lifecycle: write a temperature converter, build it, push it to a registry, deploy a DataflowGraph that uses it, send test data, and confirm the output. If you want to skip building and use prebuilt modules instead, see Deploy prebuilt modules from a public registry.
Prerequisites
- An Azure IoT Operations instance deployed on an Arc-enabled Kubernetes cluster. See Deploy Azure IoT Operations.
- A registry endpoint configured to point to a container registry. See Configure registry endpoints.
- ORAS CLI installed for pushing artifacts to the registry.
mosquitto_pubandmosquitto_sub(or another MQTT client) for testing.
Choose your development language and install the required tools:
# Install Rust toolchain
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
# Add WASM target (required for Azure IoT Operations WASM components)
rustup target add wasm32-wasip2
# Install build tools for validating and packaging WASM artifacts
cargo install wasm-tools --version '=1.201.0' --locked
Step 1: Write the module
Create a temperature converter that transforms Fahrenheit to Celsius.
cargo new --lib temperature-converter
cd temperature-converter
Create .cargo/config.toml to configure the SDK registry:
[registries]
aio-wg = { index = "sparse+https://pkgs.dev.azure.com/azure-iot-sdks/iot-operations/_packaging/preview/Cargo/index/" }
[build]
target = "wasm32-wasip2"
Tip
Adding [build] target = "wasm32-wasip2" to your .cargo/config.toml means you don't need to pass --target wasm32-wasip2 on every cargo build command. The Azure Samples dataflow graphs repository uses this pattern.
Edit Cargo.toml:
[package]
name = "temperature-converter"
version = "0.1.0"
edition = "2021"
[dependencies]
wit-bindgen = "0.22"
wasm_graph_sdk = { version = "=1.1.3", registry = "aio-wg" }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["alloc"] }
[lib]
crate-type = ["cdylib"]
Write src/lib.rs:
use serde_json::{json, Value};
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
fn fahrenheit_to_celsius_init(_configuration: ModuleConfiguration) -> bool {
logger::log(Level::Info, "temperature-converter", "Init invoked");
true
}
#[map_operator(init = "fahrenheit_to_celsius_init")]
fn fahrenheit_to_celsius(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut result) = input else {
return Err(Error {
message: "Unexpected input type".to_string(),
});
};
let payload = &result.payload.read();
if let Ok(data_str) = std::str::from_utf8(payload) {
if let Ok(mut data) = serde_json::from_str::<Value>(data_str) {
if let Some(temp) = data["temperature"]["value"].as_f64() {
let celsius = (temp - 32.0) * 5.0 / 9.0;
data["temperature"] = json!({
"value_celsius": celsius,
"original_fahrenheit": temp
});
if let Ok(output_str) = serde_json::to_string(&data) {
result.payload = BufferOrBytes::Bytes(output_str.into_bytes());
}
}
}
}
Ok(DataModel::Message(result))
}
Step 2: Build the WASM module
cargo build --release --target wasm32-wasip2
cp target/wasm32-wasip2/release/temperature_converter.wasm .
You can also use the Docker builders for reproducible builds in CI/CD. See Docker builds.
Step 3: Push to a registry
Push your built module and a graph definition to a container registry.
First, create a graph definition file graph-simple.yaml:
metadata:
name: "Temperature converter"
description: "Converts temperature from Fahrenheit to Celsius"
version: "1.0.0"
$schema: "https://www.schemastore.org/aio-wasm-graph-config-1.0.0.json"
vendor: "Contoso"
moduleRequirements:
apiVersion: "1.1.0"
runtimeVersion: "1.1.0"
moduleConfigurations:
- name: module-temperature/map
parameters: {}
operations:
- operationType: "source"
name: "source"
- operationType: "map"
name: "module-temperature/map"
module: "temperature:1.0.0"
- operationType: "sink"
name: "sink"
connections:
- from:
name: "source"
to:
name: "module-temperature/map"
- from:
name: "module-temperature/map"
to:
name: "sink"
Then push both artifacts:
# Push the WASM module (Rust output is in target/wasm32-wasip2/release/)
oras push <YOUR_REGISTRY>/temperature:1.0.0 \
--artifact-type application/vnd.module.wasm.content.layer.v1+wasm \
temperature_converter.wasm:application/wasm
# Push the graph definition
oras push <YOUR_REGISTRY>/graph-simple:1.0.0 \
--config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml \
graph-simple.yaml:application/yaml \
--disable-path-validation
Replace <YOUR_REGISTRY> with your registry (for example, myacr.azurecr.io).
Tip
For a quick test without a private registry, you can use the prebuilt modules at ghcr.io/azure-samples/explore-iot-operations. See Deploy prebuilt modules.
Step 4: Deploy a DataflowGraph
Create and apply a DataflowGraph resource that reads from an MQTT topic, processes data through your module, and writes to another topic.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: temperature-graph
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: mqtt-source
sourceSettings:
endpointRef: default
dataSources:
- thermostats/temperature
- nodeType: Graph
name: temperature-converter
graphSettings:
registryEndpointRef: my-registry-endpoint
artifact: graph-simple:1.0.0
configuration:
- key: temperature_lower_bound
value: "-40"
- key: temperature_upper_bound
value: "3422"
- nodeType: Destination
name: mqtt-destination
destinationSettings:
endpointRef: default
dataDestination: thermostats/temperature/converted
nodeConnections:
- from:
name: mqtt-source
to:
name: temperature-converter
- from:
name: temperature-converter
to:
name: mqtt-destination
kubectl apply -f temperature-graph.yaml
Step 5: Test end to end
Open two terminals. In one, subscribe to the output topic:
mosquitto_sub -h localhost -t "thermostats/temperature/converted" -v
In the other, publish a test message:
mosquitto_pub -h localhost -t "thermostats/temperature" \
-m '{"temperature": {"value": 72, "unit": "F"}}'
You should see converted output. The exact format depends on which language you used:
If you don't see output, check the dataflow pod logs:
kubectl logs -l app=aio-dataflow -n azure-iot-operations --tail=50
Now that you've seen the full lifecycle, the rest of this article covers each step in depth.
Concepts
Operators and modules
Operators are the processing units in a data flow graph. Each type serves a specific purpose:
| Operator | Purpose | Return type |
|---|---|---|
| Map | Transform each data item (for example, convert temperature units) | DataModel |
| Filter | Pass or drop items based on a condition | bool |
| Branch | Route items to two different paths | bool |
| Accumulate | Aggregate items within time windows | DataModel |
| Concatenate | Merge multiple streams while preserving order | N/A |
| Delay | Advance timestamps to control timing | N/A |
A module is the WASM binary that implements one or more operators. For example, a single temperature.wasm module can provide both a map operator (for conversion) and a filter operator (for threshold checking).
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
This separation lets you reuse the same module with different graph configurations, version modules independently, and change behavior through configuration parameters without rebuilding.
Timely dataflow model
Data flow graphs build on the Timely dataflow computational model from Microsoft Research's Naiad project. Every data item carries a hybrid logical clock timestamp:
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
This gives you deterministic processing (same input always produces same output), exactly-once semantics, and distributed coordination across nodes. For the complete WIT schema, see the samples repository.
To learn how to develop WASM modules with the VS Code extension, see Build WASM modules with VS Code extension.
Write operators
Map operator
A map operator transforms each data item and returns a modified copy. The quickstart example shows a basic map. Here's a more complex example that uses configuration parameters:
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Filter operator
A filter returns true to pass data through or false to drop it.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Branch operator
A branch routes data to two paths. Return false for the first arm, true for the second.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Module configuration parameters
Your operators can receive runtime configuration parameters through the init function. This lets you customize behavior without rebuilding the module.
The init function receives a ModuleConfiguration struct:
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
The init function is called once when the module loads. Return true to start processing, or false to signal a configuration error. If init returns false, the operator won't process any data and the dataflow logs an error.
Important
If your operator depends on configuration parameters (for example, filter bounds or threshold values), always handle the case where they aren't provided. Use sensible defaults or return false from init. Don't call unwrap() or panic on missing parameters, because this crashes the operator at runtime with no clear error message.
You define the parameters in the graph definition's moduleConfigurations section:
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
The name field must match the operator name in the graph's operations section. For more about graph definition structure, see Configure WebAssembly graph definitions.
Build options
Docker builds
Use containerized builds for CI/CD or when you don't want to install the full toolchain locally. The Docker images include all dependencies and schemas.
# Release build
docker run --rm -v "$(pwd):/workspace" \
ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder \
--app-name temperature-converter
# Debug build (includes symbols)
docker run --rm -v "$(pwd):/workspace" \
ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder \
--app-name temperature-converter --build-mode debug
--app-name must match your crate name from Cargo.toml. See Rust Docker builder docs for more options.
Module size and performance
WASM modules run in a sandboxed environment with limited resources. Keep these guidelines in mind:
- Minimize dependencies. For Rust, use
default-features = falseonserdeandserde_jsonto reduce binary size. Avoid pulling in large crates. - Module size matters. Smaller modules load faster and use less memory. A typical temperature converter is ~2 MB (Rust release) or ~5 MB (Python). Use release builds for production.
- Avoid blocking operations. The
processfunction should complete quickly. Heavy computation delays the entire dataflow pipeline. - Use
wasm-toolsto inspect. Runwasm-tools component wit your-module.wasmto verify your module exports the expected interfaces before pushing to a registry.
Versioning and CI/CD
Use semantic versioning for your modules and graph definitions. The dataflow graph references artifacts by name and tag (for example, temperature:1.0.0), so you can update modules without changing graph definitions by pushing a new version with the same tag.
For automated builds, a typical pipeline looks like:
- Build the WASM module (use the Docker builder for consistency).
- Run
wasm-tools component witto verify exported interfaces. - Run unit tests against your core logic (see Testing).
- Push to your registry with ORAS, tagging with the build version.
- (Optional) Update the graph definition's artifact reference and push.
The dataflow graph automatically picks up new module versions pushed to the same tag without requiring a redeployment. See Update a module in a running graph.
Host APIs
Your WASM modules can use host APIs for state management, logging, and metrics.
State store
Persist data across process calls using the distributed state store:
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Logging
Structured logging with severity levels:
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metrics
OpenTelemetry-compatible metrics:
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
ONNX inference
To embed and run small ONNX models inside your modules for in-band inference, see Run ONNX inference in WebAssembly data flow graphs.
WIT schema reference
All operators implement standardized interfaces defined using WebAssembly Interface Types (WIT). You can find the complete schemas in the samples repository.
Operator interfaces
Every operator has an init function for configuration and a process function for data handling:
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Data model
From processor.wit (wasm-graph:processor@1.1.0):
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Note
Most operators work with the message variant. Check for this type at the start of your process function. The payload uses either a host buffer handle (buffer) for zero-copy reads or module-owned bytes (bytes). Call buffer.read() to copy host bytes into your module's memory.
Test your modules
Unit testing
Extract your core logic into plain functions that you can test without WASM:
// In src/lib.rs - extract the conversion logic
pub fn fahrenheit_to_celsius(f: f64) -> f64 {
(f - 32.0) * 5.0 / 9.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_boiling_point() {
assert!((fahrenheit_to_celsius(212.0) - 100.0).abs() < 0.001);
}
#[test]
fn test_freezing_point() {
assert!((fahrenheit_to_celsius(32.0) - 0.0).abs() < 0.001);
}
#[test]
fn test_body_temperature() {
assert!((fahrenheit_to_celsius(98.6) - 37.0).abs() < 0.001);
}
}
cargo test # Runs tests without WASM target
Inspect WASM output
Verify your module exports the expected interfaces before pushing to a registry:
wasm-tools component wit your-module.wasm
This shows the WIT interfaces your module implements. Verify you see the expected map, filter, or branch export.
End-to-end testing on a cluster
For integration testing, deploy your module to a development cluster and use MQTT to send test data:
- Push the module to a test registry.
- Deploy a DataflowGraph pointing at the test registry.
- Subscribe to the output topic:
mosquitto_sub -h localhost -t "output/topic" -v - Publish test messages:
mosquitto_pub -h localhost -t "input/topic" -m '{"temperature": {"value": 72}}' - Verify the output matches expectations.
- Check pod logs for errors:
kubectl logs -l app=aio-dataflow -n azure-iot-operations --tail=50
Troubleshoot
Build errors
| Error | Cause | Fix |
|---|---|---|
error[E0463]: can't find crate for std |
Missing WASM target | Run rustup target add wasm32-wasip2 |
error: no matching package found for wasm_graph_sdk |
Missing cargo registry | Add the [registries] block to .cargo/config.toml as shown in Quickstart step 1 |
componentize-py can't find WIT files |
Schema path wrong | Use -d flag with the full path to the schema directory. All .wit files must be present because they reference each other. |
componentize-py version mismatch |
Bindings generated with different version | Delete the generated bindings directory and regenerate with the same componentize-py version |
wasm-tools component check fails |
Wrong target or missing component adapter | Ensure you're using wasm32-wasip2 (not wasm32-wasi or wasm32-unknown-unknown) |
Runtime errors
| Symptom | Cause | Fix |
|---|---|---|
| Operator crashes with WASM backtrace | Missing or invalid configuration parameters | Add defensive parsing in init with defaults. See Module configuration parameters. |
init returns false, dataflow won't start |
Configuration validation failed | Check dataflow logs for error messages. Verify moduleConfigurations names match your code. |
| Module loads but produces no output | process returning errors or filter dropping everything |
Add logging in process to trace data flow. |
Unexpected input type |
Module received wrong data-model variant |
Add a type check at the start of process and handle unexpected variants. |
| Module works alone but crashes in complex graph | Missing config when reused across nodes | Each graph node needs its own moduleConfigurations entry. |
Common pitfalls
- Forgetting
--artifact-typeon ORAS push. Without it, the operations experience UI won't display your module correctly. - Mismatched
nameinmoduleConfigurations. The name must be<module>/<operator>(for example,module-temperature/filter), matching the graph definition'soperationssection. - Using
wasm32-wasiinstead ofwasm32-wasip2. Azure IoT Operations requires the WASI Preview 2 target. - Python: working outside the samples repo without copying the schema directory. All
.witfiles must be co-located because they reference each other.
Next steps
- Configure WebAssembly graph definitions for graph structure and configuration
- Deploy WASM modules and graph definitions for registry and deployment details
- Use WebAssembly with data flow graphs for DataflowGraph resource configuration
- Build WASM modules with VS Code extension for IDE-based development
- Run ONNX inference in WASM for ML model integration
- Azure IoT Operations WASM samples on GitHub
- Data flow graph samples with schema validation and WIT composition on GitHub