Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come usare SynapseML in Apache Spark per il rilevamento anomalie multivariato. Il rilevamento delle anomalie multivariate consente il rilevamento di anomalie tra molte variabili o timeseries, tenendo conto di tutte le correlazioni e le dipendenze tra le diverse variabili. In questo scenario, utilizziamo SynapseML per addestrare un modello di foresta di isolamento per il rilevamento di anomalie multivariate e successivamente utilizzare il modello addestrato per identificare anomalie multivariate all'interno di un set di dati contenente misurazioni sintetiche da tre sensori IoT.
Per maggiori informazioni sul modello Isolation Forest, vedere il documento originale di Liu et al.
Prerequisiti
- Collega il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.
Importazioni di raccolte
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from synapse.ml.isolationforest import *
from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
if running_on_synapse():
shell = TerminalInteractiveShell.instance()
shell.define_macro("foo", """a,b=10,20""")
Dati di input
# Table inputs
timestampColumn = "timestamp" # str: the name of the timestamp column in the table
inputCols = [
"sensor_1",
"sensor_2",
"sensor_3",
] # list(str): the names of the input variables
# Training Start time, and number of days to use for training:
trainingStartTime = (
"2022-02-24T06:00:00Z" # datetime: datetime for when to start the training
)
trainingEndTime = (
"2022-03-08T23:55:00Z" # datetime: datetime for when to end the training
)
inferenceStartTime = (
"2022-03-09T09:30:00Z" # datetime: datetime for when to start the training
)
inferenceEndTime = (
"2022-03-20T23:55:00Z" # datetime: datetime for when to end the training
)
# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0
Leggere i dati
df = (
spark.read.format("csv")
.option("header", "true")
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
)
)
convertire le colonne in tipi di dati appropriati
df = (
df.orderBy(timestampColumn)
.withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
.drop("_c5")
)
display(df)
Preparazione dei dati di addestramento
# filter to data with timestamps within the training window
df_train = df.filter(
(F.col(timestampColumn) >= trainingStartTime)
& (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)
Preparazione dei dati di test
# filter to data with timestamps within the inference window
df_test = df.filter(
(F.col(timestampColumn) >= inferenceStartTime)
& (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)
Addestrare il modello Isolation Forest
isolationForest = (
IsolationForest()
.setNumEstimators(num_estimators)
.setBootstrap(False)
.setMaxSamples(max_samples)
.setMaxFeatures(max_features)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
)
Successivamente, creiamo una pipeline di apprendimento automatico per eseguire il training del modello Isolation Forest. Viene inoltre illustrato come creare un esperimento MLflow e registrare il modello sottoposto a training.
La registrazione del modello MLflow è strettamente necessaria solo se si accede al modello sottoposto a training in un secondo momento. Per il training del modello e l'esecuzione dell'inferenza nello stesso notebook, il modello a oggetti del modello è sufficiente.
va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)
Eseguire l'inferenza
Caricare il modello Isolation Forest addestrato
Eseguire l'inferenza
df_test_pred = model.transform(df_test)
display(df_test_pred)
Rilevatore di anomalie preconfezionato
- Stato anomalie del punto più recente: genera un modello usando i punti precedenti e determina se il punto più recente è anomalo (Scala, Python)
- Trovare anomalie: genera un modello usando un'intera serie e trova anomalie nella serie (Scala, Python)
Contenuto correlato
- Come creare un motore di ricerca con SynapseML
- Come usare Gli strumenti SynapseML e Foundry per il rilevamento delle anomalie multivariate - Analizzare le serie temporali
Come usare SynapseML per ottimizzare gli iperparametri