Condividi tramite


Rilevamento anomalie multivariate con foresta di isolamento

Questo articolo illustra come usare SynapseML in Apache Spark per il rilevamento anomalie multivariato. Il rilevamento delle anomalie multivariate consente il rilevamento di anomalie tra molte variabili o timeseries, tenendo conto di tutte le correlazioni e le dipendenze tra le diverse variabili. In questo scenario, utilizziamo SynapseML per addestrare un modello di foresta di isolamento per il rilevamento di anomalie multivariate e successivamente utilizzare il modello addestrato per identificare anomalie multivariate all'interno di un set di dati contenente misurazioni sintetiche da tre sensori IoT.

Per maggiori informazioni sul modello Isolation Forest, vedere il documento originale di Liu et al.

Prerequisiti

  • Collega il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.

Importazioni di raccolte

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Dati di input

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Leggere i dati

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

convertire le colonne in tipi di dati appropriati

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Preparazione dei dati di addestramento

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Preparazione dei dati di test

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Addestrare il modello Isolation Forest

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Successivamente, creiamo una pipeline di apprendimento automatico per eseguire il training del modello Isolation Forest. Viene inoltre illustrato come creare un esperimento MLflow e registrare il modello sottoposto a training.

La registrazione del modello MLflow è strettamente necessaria solo se si accede al modello sottoposto a training in un secondo momento. Per il training del modello e l'esecuzione dell'inferenza nello stesso notebook, il modello a oggetti del modello è sufficiente.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Eseguire l'inferenza

Caricare il modello Isolation Forest addestrato

Eseguire l'inferenza

df_test_pred = model.transform(df_test)
display(df_test_pred)

Rilevatore di anomalie preconfezionato

Azure AI Anomaly Detector

  • Stato anomalie del punto più recente: genera un modello usando i punti precedenti e determina se il punto più recente è anomalo (Scala, Python)
  • Trovare anomalie: genera un modello usando un'intera serie e trova anomalie nella serie (Scala, Python)