Receptuur: Gereedschappen voor de gieterij - Multivariate anomalie detectie

Dit recept laat zien hoe u SynapseML en Foundry Tools, in Apache Spark, kunt gebruiken voor anomaliedetectie met meerdere variabelen. Multivariate anomaliedetectie omvat het detecteren van afwijkingen tussen veel variabelen of tijdreeksen, waarbij rekening wordt gehouden met alle intercorrelaties en afhankelijkheden tussen de verschillende variabelen. In dit scenario worden SynapseML en Foundry Tools gebruikt om een model te trainen voor anomaliedetectie met meerdere variabelen. Vervolgens gebruikt u het model om afwijkingen met meerdere variabelen in een gegevensset af te stellen die synthetische metingen van drie IoT-sensoren bevat.

Belangrijk

Vanaf 20 september 2023 kunt u geen nieuwe Anomaly Detector-resources maken. De Anomaly Detector-service wordt op 1 oktober 2026 buiten gebruik gesteld.

Ga naar de informatieresource Anomaly Detector voor meer informatie over de Azure AI Anomaly Detector.

Vereisten

  • Een Azure-abonnement - Maak er gratis een
  • Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of een lakehouse te maken.

Installatie

Vanaf een bestaande Anomaly Detector resource kunt u manieren verkennen om gegevens van verschillende formulieren te verwerken.

Een Anomaly Detector-resource maken

Opmerking

Sinds 20 september 2023 kunt u geen nieuwe Anomaly Detector-resources maken. De volgende stappen zijn alleen van toepassing als u een bestaande Anomaly Detector-resource hebt. Voor een multivariate anomaliedetectiebenadering waarvoor de Anomaly Detector-service niet is vereist, zie Multivariate Anomaly Detection with Isolation Forest.

  • Selecteer in de Azure portal Create in uw resourcegroep en typ Anomaly Detector. Selecteer de Anomaly Detector-resource.
  • Geef de resource een naam en gebruik idealiter dezelfde regio als de rest van uw resourcegroep. Gebruik de standaardopties voor de overige opties en selecteer Beoordelen + Maken en vervolgens Maken.
  • Nadat u de Anomaly Detector-resource hebt gemaakt, opent u deze en selecteert u het Keys and Endpoints deelvenster in het linkernavigatievenster. Kopieer de sleutel voor de Anomaly Detector-resource naar de ANOMALY_API_KEY omgevingsvariabele of sla deze op in de anomalyKey variabele.

Een opslagaccountresource maken

Als u tussenliggende gegevens wilt opslaan, moet u een Azure Blob Storage-account maken. Maak binnen dat opslagaccount een container voor het opslaan van de tussenliggende gegevens. Noteer de containernaam en kopieer de connection string naar die container. U hebt deze nodig om de containerName variabele en de BLOB_CONNECTION_STRING omgevingsvariabele later te vullen.

Voer uw servicesleutels in

Stel eerst de omgevingsvariabelen voor uw servicesleutels in. In de volgende cel worden de omgevingsvariabelen ANOMALY_API_KEY en de omgevingsvariabelen BLOB_CONNECTION_STRING ingesteld op basis van de waarden die zijn opgeslagen in Azure Key Vault. Als u deze zelfstudie uitvoert in uw eigen omgeving, moet u deze omgevingsvariabelen instellen voordat u verdergaat:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lees de ANOMALY_API_KEY en BLOB_CONNECTION_STRING omgevingsvariabelen en stel de containerName en location variabelen in:

# An Anomaly Detector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Maak verbinding met het opslagaccount, zodat de anomaliedetector tussenliggende resultaten in dat opslagaccount kan opslaan:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Importeer alle benodigde modules:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.services import *

Lees de voorbeeldgegevens in een Spark DataFrame:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

U kunt nu een estimator object maken dat u gebruikt om uw model te trainen. Geef de begin- en eindtijden op voor de trainingsgegevens. Geef ook de invoerkolommen op die moeten worden gebruikt en de naam van de kolom die de tijdstempels bevat. Geef ten slotte het aantal gegevenspunten op dat moet worden gebruikt in het schuifvenster voor anomaliedetectie en stel de connection string in op het Azure Blob Storage-account:

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Pas de estimator aan op de gegevens:

model = estimator.fit(df)

Zodra de training is voltooid, gebruikt u het model voor deductie. De code in de volgende cel geeft de begin- en eindtijden op voor de gegevens waarin u de afwijkingen wilt detecteren:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

In de vorige cel .show(5) worden de eerste vijf gegevensframerijen weergegeven. De resultaten zijn allemaal null omdat ze buiten het inferentievenster vallen.

Om de resultaten alleen voor de afgeleide gegevens weer te geven, selecteert u de benodigde kolommen. Vervolgens kunt u de rijen in het gegevensframe op oplopende volgorde rangschikken en het resultaat filteren om alleen de rijen in het deductievensterbereik weer te geven. Hier komt inferenceEndTime de laatste rij in het dataframe overeen, zodat u deze kunt negeren.

Als u de resultaten beter wilt uitzetten, converteert u het Spark-gegevensframe naar een Pandas-dataframe:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Formatteer de contributors-kolom die de bijdragescores van elke sensor voor de gedetecteerde afwijkingen opslaat. De volgende cel verwerkt dit en splitst de bijdragescore van elke sensor in een eigen kolom:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

U hebt nu de bijdragescores van sensoren 1, 2 en 3 in respectievelijk de series_0, series_1en series_2 kolommen.

Als u de resultaten wilt uitzetten, voert u de volgende cel uit. De minSeverity parameter geeft de minimale ernst van de afwijkingen op die moeten worden uitgezet:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Schermopname van de plot met multivariate anomaliedetectieresultaten.

De plots tonen de onbewerkte gegevens van de sensoren (in het inferentievenster) in oranje, groen en blauw. De rode verticale lijnen in de eerste afbeelding tonen de gedetecteerde afwijkingen met een ernst die groter is dan of gelijk is aan minSeverity.

In het tweede diagram ziet u de ernstscore van alle gedetecteerde afwijkingen, met de minSeverity drempelwaarde die wordt weergegeven in de rode stippellijn.

Ten slotte toont de laatste plot de bijdrage van de gegevens van elke sensor aan de gedetecteerde afwijkingen. Het helpt bij het vaststellen en begrijpen van de meest waarschijnlijke oorzaak van elke anomalie.