Rediger

Del via


Create data flows in Azure IoT Operations

Important

This page includes instructions for managing Azure IoT Operations components using Kubernetes deployment manifests, which is in PREVIEW. This feature is provided with several limitations, and shouldn't be used for production workloads.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

A data flow is the path that data takes from the source to the destination with optional transformations. You can configure the data flow by creating a Data flow custom resource or using the operations experience web UI. A data flow is made up of three parts: the source, the transformation, and the destination.

flowchart LR
  subgraph Source
  A[DataflowEndpoint]
  end
  subgraph BuiltInTransformation
  direction LR
  Datasets - -> Filter
  Filter - -> Map
  end
  subgraph Destination
  B[DataflowEndpoint]
  end
  Source - -> BuiltInTransformation
  BuiltInTransformation - -> Destination

Diagram of a data flow showing flow from source to transform then destination.

To define the source and destination, you need to configure the data flow endpoints. The transformation is optional and can include operations like enriching the data, filtering the data, and mapping the data to another field.

Important

Each data flow must have the Azure IoT Operations local MQTT broker default endpoint as either the source or destination.

You can use the operations experience in Azure IoT Operations to create a data flow. The operations experience provides a visual interface to configure the data flow. You can also use Bicep to create a data flow using a Bicep file, or use Kubernetes to create a data flow using a YAML file.

Continue reading to learn how to configure the source, transformation, and destination.

Prerequisites

You can deploy data flows as soon as you have an instance of Azure IoT Operations using the default data flow profile and endpoint. However, you might want to configure data flow profiles and endpoints to customize the data flow.

Data flow profile

If you don't need different scaling settings for your data flows, use the default data flow profile provided by Azure IoT Operations. Avoid associating too many data flows with a single data flow profile. If you have a large number of data flows, distribute them across multiple data flow profiles to reduce the risk of exceeding the data flow profile configuration size limit of 70.

To learn how to configure a new data flow profile, see Configure data flow profiles.

Data flow endpoints

You need data flow endpoints to configure the source and destination for the data flow. To get started quickly, use the default data flow endpoint for the local MQTT broker. You can also create other types of data flow endpoints like Kafka, Event Hubs, OpenTelemetry, or Azure Data Lake Storage. For more information, see Configure data flow endpoints.

Get started

When you have the prerequisites, you can start creating a data flow.

  1. To create a data flow in operations experience, select Data flow > Create data flow.

  2. Select the placeholder name new-data-flow to set the data flow properties. Enter the name of the data flow and choose the data flow profile to use. The default data flow profile is selected by default. For more information on data flow profiles, see Configure data flow profile.

    Screenshot of the operations experience interface where a user names the data flow and selects a profile for it.

    Important

    You can only choose the data flow profile when creating a data flow. You can't change the data flow profile after the data flow is created. If you want to change the data flow profile of an existing data flow, delete the original data flow and create a new one with the new data flow profile.

  3. Configure the source, transformation, and destination endpoint for the data flow by selecting the items in the data flow diagram.

    Screenshot of the operations experience interface displaying a data flow diagram with a source endpoint, transformation stage, and destination endpoint.

Review the following sections to learn how to configure the operation types of the data flow.

Source

Configure the source endpoint and data sources (topics) for the data flow. You can use the default MQTT broker, an asset, or a custom MQTT or Kafka endpoint as the source.

For complete configuration details, including MQTT topic wildcards, shared subscriptions, Kafka topics, and source schema, see Configure a data flow source.

If you don't use the default endpoint as the source, you must use it as the destination. For more information about using the local MQTT broker endpoint, see Data flows must use local MQTT broker endpoint.

Request disk persistence

Disk persistence keeps data flow processing state across restarts. For configuration details, see Configure disk persistence.

Transformation

The transformation operation is where you transform the data from the source before you send it to the destination. Transformations are optional. If you don't need to make changes to the data, don't include the transformation operation in the data flow configuration. Multiple transformations chain together in stages regardless of the order in which you specify them in the configuration. The order of the stages is always:

  1. Enrich: Add more data to the source data given a dataset and condition to match.
  2. Filter: Filter the data based on a condition.
  3. Map, Compute, Rename, or add a New property: Move data from one field to another with an optional conversion.

This section is an introduction to data flow transforms. For more detailed information, see Map data by using data flows, and Enrich data by using data flows.

In the operations experience, select Data flow > Add transform (optional).

Screenshot of the operations experience interface showing the addition of a transformation stage to a data flow.

Enrich: Add reference data

To enrich the data, first add a reference dataset in the Azure IoT Operations state store. The dataset adds extra data to the source data based on a condition. The condition is specified as a field in the source data that matches a field in the dataset.

You can load sample data into the state store by using the state store CLI. Key names in the state store correspond to a dataset in the data flow configuration.

Currently, the Enrich stage isn't supported in the operations experience.

If the dataset has a record with the asset field, similar to:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

The data from the source with the deviceId field matching thermostat1 has the location and manufacturer fields available in filter and map stages.

For more information about condition syntax, see Enrich data by using data flows

Filter: Filter data based on a condition

Use the filter stage to drop messages that don't meet a condition. You can define multiple filter rules with input fields and boolean expressions.

For complete configuration details and examples, see Filter data in a data flow.

Map: Move data from one field to another

To map the data to another field with optional conversion, use the map operation. Specify the conversion as a formula that uses the fields in the source data.

In the operations experience, you can currently map data by using Compute, Rename, and New property transforms.

Compute

Use the Compute transform to apply a formula to the source data. This operation applies a formula to the source data and stores the result in a field.

  1. Under Transform (optional), select Compute > Add.

    Screenshot using operations experience to add a compute transform.

  2. Enter the required settings.

    Setting Description
    Select formula Choose an existing formula from the dropdown or select Custom to enter a formula manually.
    Output Specify the output display name for the result.
    Formula Enter the formula to be applied to the source data.
    Description Provide a description for the transformation.
    Last known value Optionally, use the last known value if the current value isn't available.

    Enter or edit a formula in the Formula field. The formula can use the fields in the source data. Enter @ or select Ctrl + Space to choose datapoints from a dropdown. For built-in formulas, select the <dataflow> placeholder to see the list of available datapoints.

    Enter MQTT metadata properties by using the format @$metadata.user_properties.<property> or @$metadata.topic. Enter $metadata headers by using the format @$metadata.<header>. The $metadata syntax is only needed for MQTT properties that are part of the message header. For more information, see field references.

    The formula can use the fields in the source data. For example, you could use the temperature field in the source data to convert the temperature to Celsius and store it in the temperatureCelsius output field.

  3. Select Apply.

Rename

Use the Rename transform to rename a datapoint. This operation renames a datapoint in the source data to a new name. Use the new name in the subsequent stages of the data flow.

  1. Under Transform (optional), select Rename > Add.

    Screenshot using operations experience to rename a datapoint.

  2. Enter the required settings.

    Setting Description
    Datapoint Select a datapoint from the dropdown or enter a $metadata header.
    New datapoint name Enter the new name for the datapoint.
    Description Provide a description for the transformation.

    Enter MQTT metadata properties by using the format @$metadata.user_properties.<property> or @$metadata.topic. Enter $metadata headers by using the format @$metadata.<header>. The $metadata syntax is only needed for MQTT properties that are part of the message header. For more information, see field references.

  3. Select Apply.

New property

Use the New property transform to add a new property to the source data. This operation adds a new property to the source data. Use the new property in the subsequent stages of the data flow.

  1. Under Transform (optional), select New property > Add.

    Screenshot using operations experience to add a new property.

  2. Enter the required settings.

    Setting Description
    Property key Enter the key for the new property.
    Property value Enter the value for the new property.
    Description Provide a description for the new property.
  3. Select Apply.

To learn more, see Map data by using data flows.

Remove

By default, the output schema includes all datapoints. Remove any datapoint from the destination by using the Remove transform.

  1. Under Transform (optional), select Remove.

  2. Select the datapoint to remove from the output schema.

    Screenshot using operations experience to remove the weight datapoint the output schema.

  3. Select Apply.

To learn more, see Map data by using data flows.

Serialize data according to a schema

If you want to serialize the data before sending it to the destination, specify a schema and serialization format. For details, see Serialize the output with a schema.

Destination

Configure the destination endpoint and data destination (topic, container, or table) for the data flow. You can use any supported endpoint type as the destination, including MQTT, Kafka, Azure Data Lake Storage, Microsoft Fabric, Azure Data Explorer, and local storage.

For complete configuration details, including the data destination table, dynamic destination topics, and output serialization, see Configure a data flow destination.

To send data to a destination other than the local MQTT broker, create a data flow endpoint. To learn how, see Configure data flow endpoints.

Important

Storage endpoints require a schema for serialization. To use data flow with Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer, or Local Storage, you must specify a schema reference.

Example

The following example is a data flow configuration that uses the MQTT endpoint for the source and destination. The source filters the data from the MQTT topic azure-iot-operations/data/thermostat. The transformation converts the temperature to Fahrenheit and filters the data where the temperature multiplied by the humidity is less than 100000. The destination sends the data to the MQTT topic factory.

Screenshot showing the operations experience data flow example with a source endpoint, transforms, and a destination endpoint.

To see more examples of data flow configurations, see Azure REST API - Data flow and the quickstart Bicep.

Verify a data flow is working

To verify the data flow is working, follow Tutorial: Bi-directional MQTT bridge to Azure Event Grid.

Export data flow configuration

To export the data flow configuration, use the operations experience or export the data flow custom resource.

Select the data flow you want to export, then select Export from the toolbar.

Screenshot of the operations experience interface showing the export option for a configured data flow.

Proper data flow configuration

To ensure the data flow works as expected, verify the following conditions:

  • The default MQTT data flow endpoint must be used as either the source or destination.
  • The data flow profile exists and is referenced in the data flow configuration.
  • The source is either an MQTT endpoint, Kafka endpoint, or an asset. You can't use storage type endpoints as a source.
  • When you use Event Grid as the source, you set the dataflow profile instance count to 1 because Event Grid MQTT broker doesn't support shared subscriptions.
  • When you use Event Hubs as the source, each event hub in the namespace is a separate Kafka topic and you must specify each as the data source.
  • Transformation, if used, is configured with proper syntax, including proper escaping of special characters.
  • When you use storage type endpoints as destination, a schema is specified.
  • When you use dynamic destination topics for MQTT endpoints, ensure that topic variables reference valid segments.

Next steps