Compartilhar via


Receita: Ferramentas de Fundição – Detecção de Anomalias Multivariadas

Esta receita mostra como usar o SynapseML e as Ferramentas de Fundimento, no Apache Spark, para detecção de anomalias multivariadas. A detecção de anomalias multivariadas envolve a detecção de anomalias entre muitas variáveis ou séries temporais, ao mesmo tempo em que contabiliza todas as inter correlações e dependências entre as diferentes variáveis. Esse cenário usa o SynapseML e as Ferramentas Foundry destinado a treinar um modelo para detecção de anomalias multivariadas. Em seguida, você usa o modelo para inferir anomalias multivariadas em um conjunto de dados que contém medidas sintéticas de três sensores de IoT.

Importante

A partir de 20 de setembro de 2023, você não pode criar novos recursos do Detector de Anomalias. O serviço Detector de Anomalias será descontinuado em 1º de outubro de 2026.

Para obter mais informações sobre o Azure AI Anomaly Detector, visite o recurso de informações Anomaly Detector.

Pré-requisitos

  • Uma assinatura Azure – Criar uma gratuitamente
  • Conecte seu notebook a um lakehouse. No lado esquerdo, selecione Adicionar para incluir um lakehouse existente ou criar um novo.

Instalação

Começando com um recurso existente Anomaly Detector , você pode explorar maneiras de lidar com dados de vários formulários.

Criar um recurso do Detector de Anomalias

Observação

Desde 20 de setembro de 2023, você não pode criar novos recursos do Detector de Anomalias. As etapas a seguir se aplicam somente se você tiver um recurso existente do Detector de Anomalias. Para obter uma abordagem de detecção de anomalias multivariada que não exija o serviço Detector de Anomalias, consulte Detecção de Anomalias Multivariadas com Floresta de Isolamento.

  • No portal Azure, selecione Create em seu grupo de recursos e digite Anomaly Detector. Selecione o recurso Detector de Anomalias.
  • Nomeie o recurso e, idealmente, use a mesma região que o restante do grupo de recursos. Use as opções padrão para o restante e selecione Revisar e Criar e, em seguida, Criar.
  • Depois de criar o recurso detector de anomalias, abra-o e selecione o Keys and Endpoints painel na navegação esquerda. Copie a chave do recurso Detector de Anomalias na variável de ambiente ANOMALY_API_KEY ou armazene-a na variável anomalyKey.

Criar um recurso de Conta de Armazenamento

Para salvar dados intermediários, você deve criar uma conta Azure Blob Storage. Dentro dessa conta de armazenamento, crie um contêiner para armazenar os dados intermediários. Anote o nome do contêiner e copie o connection string para esse contêiner. Você precisa dela para preencher posteriormente a containerName variável e a variável de BLOB_CONNECTION_STRING ambiente.

Inserir suas chaves de serviço

Primeiro, configure as variáveis de ambiente para suas chaves de serviço. A próxima célula define as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, com base nos valores armazenados em Azure Key Vault. Se você executar este tutorial em seu próprio ambiente, certifique-se de definir essas variáveis de ambiente antes de continuar:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Leia as variáveis de ambiente ANOMALY_API_KEY e BLOB_CONNECTION_STRING, e defina as variáveis containerName e location:

# An Anomaly Detector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
    "wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"

Conecte-se à conta de armazenamento para que o detector de anomalias possa salvar resultados intermediários nessa conta de armazenamento:

spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)

Importe todos os módulos necessários:

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.services import *

Leia os dados de exemplo em um DataFrame do Spark:

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)

df = (
    df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)

# Let's inspect the dataframe:
df.show(5)

Agora você pode criar um estimator objeto, que você usa para treinar seu modelo. Especifique os horários de início e término para os dados de treinamento. Especifique também as colunas de entrada a serem usadas e o nome da coluna que contém os carimbos de data/hora. Por fim, especifique o número de pontos de dados a serem usados na janela deslizante de detecção de anomalias e defina o connection string para a conta Azure Blob Storage:

trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]

estimator = (
    FitMultivariateAnomaly()
    .setSubscriptionKey(anomalyKey)
    .setLocation(location)
    .setStartTime(trainingStartTime)
    .setEndTime(trainingEndTime)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
)

Ajustar o estimator aos dados:

model = estimator.fit(df)

Depois que o treinamento for concluído, use o modelo para inferência. O código na próxima célula especifica os horários de início e término dos dados nos quais você deseja detectar as anomalias:

inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"

result = (
    model.setStartTime(inferenceStartTime)
    .setEndTime(inferenceEndTime)
    .setOutputCol("results")
    .setErrorCol("errors")
    .setInputCols(inputColumns)
    .setTimestampCol(timestampColumn)
    .transform(df)
)

result.show(5)

Na célula anterior, .show(5) mostra as cinco primeiras linhas de dataframe. Os resultados são todos null porque chegam fora da janela de inferência.

Para mostrar os resultados apenas para os dados inferidos, selecione as colunas necessárias. Em seguida, você pode ordenar as linhas no dataframe por ordem crescente e filtrar o resultado para mostrar apenas as linhas no intervalo da janela de inferência. Aqui, inferenceEndTime corresponde à última linha no dataframe, para que você possa ignorá-la.

Por fim, para plotar melhor os resultados, converta o dataframe do Spark em um dataframe do Pandas:

rdf = (
    result.select(
        "timestamp",
        *inputColumns,
        "results.contributors",
        "results.isAnomaly",
        "results.severity"
    )
    .orderBy("timestamp", ascending=True)
    .filter(col("timestamp") >= lit(inferenceStartTime))
    .toPandas()
)

rdf

Formate a coluna contributors que armazena a pontuação de contribuição de cada sensor para as anomalias detectadas. A próxima célula lida com isso e divide a pontuação de contribuição de cada sensor em sua própria coluna:

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {"series_0": 0, "series_1": 0, "series_2": 0}

rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
    [rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf

Agora você tem as pontuações de contribuição dos sensores 1, 2 e 3 nas colunas series_0, series_1 e series_2, respectivamente.

Para plotar os resultados, execute a próxima célula. O minSeverity parâmetro especifica a gravidade mínima das anomalias a serem plotdas:

minSeverity = 0.1

####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
    rdf["timestamp"],
    rdf["sensor_1"],
    color="tab:orange",
    linestyle="solid",
    linewidth=2,
    label="sensor_1",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_2"],
    color="tab:green",
    linestyle="solid",
    linewidth=2,
    label="sensor_2",
)
plt.plot(
    rdf["timestamp"],
    rdf["sensor_3"],
    color="tab:blue",
    linestyle="solid",
    linewidth=2,
    label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()

anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)

plt.legend()
plt.title(
    "A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
    rdf["timestamp"],
    rdf["severity"],
    color="black",
    linestyle="solid",
    linewidth=2,
    label="Severity score",
)
plt.plot(
    rdf["timestamp"],
    [minSeverity] * len(rdf["severity"]),
    color="red",
    linestyle="dotted",
    linewidth=1,
    label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
    rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
    rdf["timestamp"],
    rdf["series_1"],
    width=2,
    color="tab:green",
    label="sensor_2",
    bottom=rdf["series_0"],
)
plt.bar(
    rdf["timestamp"],
    rdf["series_2"],
    width=2,
    color="tab:blue",
    label="sensor_3",
    bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

Captura de tela do gráfico de resultados de detecção de anomalias multivariadas.

Os gráficos mostram os dados brutos dos sensores (dentro da janela de inferência) em laranja, verde e azul. As linhas verticais vermelhas na primeira figura mostram as anomalias detectadas com uma gravidade maior ou igual a minSeverity.

O segundo gráfico mostra a pontuação de gravidade de todas as anomalias detectadas, com o limite minSeverity mostrado na linha vermelha pontilhada.

Finalmente, o último gráfico mostra a contribuição dos dados de cada sensor para as anomalias detectadas. Ele ajuda a diagnosticar e entender a causa mais provável de cada anomalia.