Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa ricetta illustra come usare Gli strumenti SynapseML e Foundry, in Apache Spark, per il rilevamento di anomalie multivariate. Il rilevamento delle anomalie multivariate comporta il rilevamento di anomalie tra molte variabili o serie temporali, tenendo conto di tutte le correlazioni e delle dipendenze tra le diverse variabili. Questo scenario usa Gli strumenti SynapseML e Foundry per eseguire il training di un modello per il rilevamento anomalie multivariato. Si usa quindi il modello per dedurre anomalie multivariate all'interno di un set di dati che contiene misurazioni sintetiche da tre sensori IoT.
Importante
A partire dal 20 settembre 2023 non è possibile creare nuove risorse rilevamento anomalie. Il servizio Rilevamento anomalie viene ritirato il 1° ottobre 2026.
Per ulteriori informazioni su Azure AI Anomaly Detector, visitare la risorsa informativa Anomaly Detector.
Prerequisiti
- Una sottoscrizione Azure - Crearne una gratuitamente
- Collega il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.
Installazione
A partire da una risorsa esistente Anomaly Detector , è possibile esplorare i modi per gestire i dati di vari moduli.
Creare una risorsa Rilevatore di Anomalie
Annotazioni
Dal 20 settembre 2023 non è possibile creare nuove risorse rilevamento anomalie. La procedura seguente si applica solo se si dispone di una risorsa Rilevamento anomalie esistente. Per un approccio di rilevamento anomalie multivariato che non richiede il servizio Rilevamento anomalie, vedere Rilevamento anomalie multivariato con foresta di isolamento.
- Nel portale di Azure selezionare Create nel gruppo di risorse e quindi digitare Anomaly Detector. Selezionare la risorsa di Rilevamento anomalie.
- Denominare la risorsa e idealmente usare la stessa area del resto del gruppo di risorse. Usare le opzioni predefinite per il resto, quindi selezionare Rivedi e crea e quindi Crea.
- Dopo aver creato la risorsa Anomaly Detector, aprirla e selezionare il
Keys and Endpointspannello nel riquadro di navigazione a sinistra. Copiare la chiave per la risorsa Rilevamento anomalie nella variabile di ambienteANOMALY_API_KEYoppure archiviarla nella variabileanomalyKey.
Crea un account di archiviazione risorsa
Per salvare i dati intermedi, è necessario creare un account Azure Blob Storage. All'interno dell'account di archiviazione creare un contenitore per l'archiviazione dei dati intermedi. Prendere nota del nome del contenitore e copiare il connection string in tale contenitore. È necessario in un secondo momento popolare la variabile containerName e la variabile di ambiente BLOB_CONNECTION_STRING.
Immettere le chiavi del servizio
Prima di tutto, configurare le variabili di ambiente per le chiavi del servizio. La cella successiva imposta le variabili di ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, in base ai valori archiviati in Azure Key Vault. Se si esegue questa esercitazione nel proprio ambiente, assicurarsi di impostare queste variabili di ambiente prima di procedere:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Leggere le variabili di ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING e impostare le variabili containerName e location.
# An Anomaly Detector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Connettersi all'account di archiviazione, in modo che il rilevamento anomalie possa salvare i risultati intermedi in tale account di archiviazione:
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importare tutti i moduli necessari:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.services import *
Leggere i dati di esempio in un dataframe Spark:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
È ora possibile creare un estimator oggetto, che viene usato per eseguire il training del modello. Specificare l'ora di inizio e di fine per i dati di training. Specificare anche le colonne di input da usare e il nome della colonna che contiene i timestamp. Infine, specificare il numero di punti dati da usare nella finestra temporale scorrevole per il rilevamento delle anomalie e impostare la stringa di connessione all'account di archiviazione BLOB di Azure.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Adattare estimator ai dati:
model = estimator.fit(df)
Al termine dell'addestramento, usare il modello per svolgere inferenze. Il codice nella cella successiva specifica l'ora di inizio e di fine per i dati in cui si desidera rilevare le anomalie:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
Nella cella precedente vengono .show(5) visualizzate le prime cinque righe del dataframe. I risultati sono tutti null perché si trovano fuori dalla finestra di inferenza.
Per visualizzare i risultati solo per i dati dedotti, selezionare le colonne necessarie. È quindi possibile ordinare le righe nel dataframe in ordine crescente e filtrare il risultato in modo da visualizzare solo le righe nell'intervallo della finestra di inferenza. In questo caso, inferenceEndTime corrisponde all'ultima riga nel dataframe, in modo da poterla ignorare.
Infine, per tracciare meglio i risultati, convertire il frame di dati Spark in un dataframe Pandas:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Formattare la colonna contributors che archivia il punteggio di contributo da ogni sensore alle anomalie rilevate. La cella successiva gestisce questa operazione e suddivide il punteggio di contributo di ogni sensore nella propria colonna:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Sono ora disponibili i punteggi di contributo dei sensori 1, 2 e 3 rispettivamente nelle series_0colonne , series_1e series_2 .
Per tracciare i risultati, eseguire la cella successiva. Il minSeverity parametro specifica la gravità minima delle anomalie da tracciare:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
I tracciati mostrano i dati non elaborati dei sensori (all'interno della finestra di inferenza) in arancione, verde e blu. Le linee verticali rosse nella prima figura mostrano le anomalie rilevate con una gravità maggiore o uguale a minSeverity.
Il secondo grafico mostra il punteggio di gravità di tutte le anomalie rilevate, con la soglia minSeverity visualizzata nella linea rossa punteggiata.
Infine, l'ultimo tracciato mostra il contributo dei dati di ogni sensore alle anomalie rilevate. Consente di diagnosticare e comprendere la causa più probabile di ogni anomalia.
Contenuto correlato
- Multivariate Anomaly Detection with Isolation Forest non richiede la risorsa Azure AI Anomaly Detector.
- Come usare LightGBM con SynapseML
- Come usare gli strumenti Foundry con SynapseML
- Come usare SynapseML per ottimizzare gli iperparametri