Freigeben über


Multivariate Anomaly-Erkennung mit Isolation Forest

In diesem Artikel wird gezeigt, wie Sie SynapseML auf Apache Spark für die multivariate Anomalieerkennung verwenden können. Die Multivariate Anomaly-Erkennung ermöglicht die Erkennung von Anomalien zwischen vielen Variablen oder Zeitserien, wobei alle Interkorrelationen und Abhängigkeiten zwischen den verschiedenen Variablen berücksichtigt werden. In diesem Szenario verwenden wir SynapseML, um ein Isolationsgesamtstrukturmodell für die multivariate Anomalieerkennung zu trainieren. Anschließend verwenden wir das trainierte Modell, um multivariate Anomalien innerhalb eines Datasets abzuleiten, das synthetische Messungen von drei IoT-Sensoren enthält.

Weitere Informationen zum Isolation Forest-Modell finden Sie im Originalpapier von Liu et al..

Voraussetzungen

  • Fügen Sie Ihr Notizbuch an ein Seehaus an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Seehaus hinzuzufügen oder ein Seehaus zu erstellen.

Bibliotheksimporte

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Eingangsdaten

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Daten lesen

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

Umwandeln von Spalten in geeignete Datentypen

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Vorbereitung von Schulungsdaten

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Testen der Datenvorbereitung

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Isolation-Forest-Modell trainieren

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Als Nächstes erstellen wir eine ML-Pipeline zum Trainieren des Isolations-Gesamtstrukturmodells. Außerdem wird gezeigt, wie ein MLflow-Experiment erstellt und das trainierte Modell registriert wird.

Die MLflow-Modellregistrierung ist ausschließlich erforderlich, wenn sie zu einem späteren Zeitpunkt auf das trainierte Modell zugreifen. Für die Schulung des Modells und die Durchführung der Ableitung im selben Notizbuch reicht das Modellobjektmodell aus.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Durchführen von Inferenzen

Laden des trainierten Isolation Forest-Modells

Durchführen von Inferenz

df_test_pred = model.transform(df_test)
display(df_test_pred)

Vorgefertigter Anomaliedetektor

Azure AI-Anomalie-Detektor

  • Anomaliestatus des letzten Punkts: Generiert ein Modell anhand vorheriger Punkte und ermittelt, ob der letzte Punkt anomal ist. (Scala, Python)
  • Anomalien suchen: Generiert ein Modell anhand einer ganzen Reihe und findet Anomalien in der Reihe. (Scala, Python)