Ett dataflödesdiagram är en sammansättningsbar bearbetningspipeline som transformerar data när de flyttas mellan källor och mål. Ett standarddataflöde följer en fast sekvens av berika, filtrera och kartlägga. Med ett dataflödesdiagram kan du länka transformeringar i valfri ordning, förgrena till parallella sökvägar och aggregera data över tidsfönster.
Den här artikeln går igenom hur du skapar ett dataflödesdiagram steg för steg. En översikt över dataflödesdiagram och tillgängliga transformeringar finns i Översikt över dataflödesdiagram.
Viktigt!
Dataflödesdiagram stöder för närvarande endast MQTT-, Kafka- och OpenTelemetry-slutpunkter. Andra slutpunktstyper som Data Lake, Microsoft Fabric OneLake, Azure Data Explorer och Local Storage stöds inte. Mer information finns i Kända problem.
Förutsättningar
Skapa ett dataflödesdiagram
Ett dataflödesdiagram innehåller tre typer av element: källor som tar in data, transformerar den och mål som skickar ut dem. Anslut dem i den ordning du vill att data ska flöda.
I driftmiljön går du till din Azure IoT Operations-instans.
Välj Dataflödesdiagram>Skapa dataflödesdiagram.
Ange ett namn för dataflödesdiagrammet och välj en dataflödesprofil. Standardprofilen är markerad som standard.
Skapa din pipeline genom att lägga till objekt på duken.
Lägg till en källa: Välj källslutpunkten och konfigurera ämnena att prenumerera på för inkommande meddelanden.
-
Lägg till transformeringar: Välj en eller flera transformeringar för att bearbeta data. Tillgängliga transformeringar är map, filter, branch, concatenate och window. Mer information om varje transformeringstyp finns i Översikt över dataflödesdiagram.
Lägg till ett mål: Välj målslutpunkten och konfigurera ämnet eller sökvägen för att skicka bearbetade data till.
Anslut elementen i den ordning du vill att data ska flöda.
Välj Spara för att distribuera dataflödesdiagrammet.
Skapa en Bicep-fil .bicep med följande struktur. Det här exemplet skapar ett dataflödesdiagram som läser temperaturdata, konverterar dem till Fahrenheit och skickar dem till ett målämne.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2025-10-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2025-10-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2025-10-01' = {
parent: defaultDataflowProfile
name: 'temperature-processing'
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
profileRef: 'default'
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
]
}
}
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/converted'
}
}
]
nodeConnections: [
{
from: { name: 'sensors' }
to: { name: 'convert' }
}
{
from: { name: 'convert' }
to: { name: 'output' }
}
]
}
}
Distribuera Bicep-filen:
az deployment group create --resource-group <RESOURCE_GROUP> --template-file <FILE>.bicep
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
Skapa en Kubernetes-manifestfil .yaml med följande struktur. Det här exemplet skapar ett dataflödesdiagram som läser temperaturdata, konverterar dem till Fahrenheit och skickar dem till ett målämne.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: temperature-processing
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["*"],
"output": "*"
},
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/converted
nodeConnections:
- from:
name: sensors
to:
name: convert
- from:
name: convert
to:
name: output
Använd manifestet:
kubectl apply -f <FILE>.yaml
Källan definierar var data kommer in i pipelinen. Ange en slutpunktsreferens och ett eller flera avsnitt.
I dataflödesgrafredigeraren väljer du källelementet och konfigurerar:
| Inställning |
Beskrivning |
|
Slutpunkt |
Den dataflödesslutpunkt som ska användas. Välj standard för den lokala MQTT-koordinatorn. |
|
Topics |
Ett eller flera avsnitt att prenumerera på för inkommande meddelanden. |
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
'telemetry/humidity'
]
}
}
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- telemetry/humidity
Transformeringar bearbetar data mellan källan och målet. Varje transformering refererar till en inbyggd artefakt och konfigureras med regler.
De tillgängliga inbyggda transformerna är:
| Omvandla |
Artifact |
Beskrivning |
| Karta |
azureiotoperations/graph-dataflow-map:1.0.0 |
Byt namn på, omstrukturera, beräkna och kopiera fält |
| Filter |
azureiotoperations/graph-dataflow-filter:1.0.0 |
Ta bort meddelanden som matchar ett villkor |
| Filial |
azureiotoperations/graph-dataflow-branch:1.0.0 |
Dirigera meddelanden till en true eller false sökväg |
| Concatenate |
azureiotoperations/graph-dataflow-concatenate:1.0.0 |
Sammanfoga förgrenade banor igen |
| Fönster |
azureiotoperations/graph-dataflow-window:1.0.0 |
Aggregera data över ett tidsintervall |
Detaljerad konfiguration av varje transformeringstyp finns i:
I dataflödesdiagramredigeraren väljer du Lägg till transformering och väljer transformeringstyp. Konfigurera reglerna i det visuella redigeringsprogrammet.
Varje transformering är en nod med nodeType: 'Graph'. Egenskapen configuration skickar regler som en JSON-sträng:
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
Varje transformering är en nod med nodeType: Graph. Egenskapen configuration skickar regler som en JSON-sträng:
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
Du kan länka valfritt antal transformeringar. Anslut dem i avsnittet nodeConnections i den ordning du vill att data ska flöda:
Dra anslutningar mellan transformeringar på arbetsytan för att definiera bearbetningsordningen.
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'remove-bad-data' } }
{ from: { name: 'remove-bad-data' }, to: { name: 'convert' } }
{ from: { name: 'convert' }, to: { name: 'output' } }
]
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
nodeConnections:
- from: { name: sensors }
to: { name: remove-bad-data }
- from: { name: remove-bad-data }
to: { name: convert }
- from: { name: convert }
to: { name: output }
Målet definierar var bearbetade data skickas. Ange en slutpunktsreferens och ett ämne eller en sökväg.
Välj målelementet och konfigurera:
| Inställning |
Beskrivning |
|
Slutpunkt |
Dataflödesslutpunkten som data ska skickas till. |
|
Topic |
Ämnet eller sökvägen för att publicera bearbetade data till. |
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/processed'
}
}
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/processed
Dynamisk ämnesroutning baserat på meddelandeinnehåll finns i Dirigera meddelanden till olika ämnen.
Kontrollera att dataflödesdiagrammet fungerar
När du har distribuerat ett dataflödesdiagram kontrollerar du att det körs:
I driftmiljön väljer du dataflödesdiagrammet för att visa dess status. Ett felfritt diagram visar tillståndet Körs .
Kontrollera resursens DataflowGraph status:
az resource show --resource-group <RESOURCE_GROUP> --resource-type Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs --name <GRAPH_NAME> --parent instances/<INSTANCE_NAME>/dataflowProfiles/<PROFILE_NAME>
Viktigt!
Användning av Kubernetes-distributionsmanifest stöds inte i produktionsmiljöer och bör endast användas för felsökning och testning.
kubectl get dataflowgraph temperature-processing -n azure-iot-operations
Kontrollera om det finns några fel i poddloggarna:
kubectl logs -l app=dataflow -n azure-iot-operations --tail=50
Nästa steg