Tarefas de classificação usando SynapseML

Este artigo mostra como executar uma tarefa de classificação de texto com dois métodos. Um método usa simples pysparke o outro usa a synapseml biblioteca. Ambos os métodos apresentam o mesmo desempenho, mas destacam como o SynapseML reduz a complexidade do código em comparação com pyspark.

A tarefa prevê se uma revisão do cliente de um livro vendido na Amazon é boa (classificação > 3) ou ruim, com base no texto de revisão. Treine os aprendizes de LogisticRegression com hiperparâmetros diferentes e escolha o melhor modelo.

Pré-requisitos

  • Crie um bloco de anotações.
  • Anexe seu notebook a um lakehouse. No bloco de anotações, selecione Adicionar no painel esquerdo para anexar uma lakehouse existente ou criar uma nova.

Note

Todas as bibliotecas usadas neste artigo (pyspark, synapseml, numpy) são pré-instaladas no runtime do Fabric Spark. Você não precisa instalar nenhum pacote.

Carregar e explorar os dados

Em Fabric notebooks, uma sessão do Spark já está disponível como a variável spark. Carregue o conjunto de dados de revisões do livro da Amazon de um local de Armazenamento de Blobs do Azure público:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Verifique se o conjunto de dados foi carregado corretamente:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

Extrair características e processar dados

Os dados reais geralmente têm recursos de vários tipos, por exemplo, texto, numérico e categórico. Para demonstrar como trabalhar com tipos de recursos mistos, adicione dois recursos numéricos ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio da palavra.

Definir UDFs (funções definidas pelo usuário)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

Aplique UDFs com o UDFTransformer do SynapseML

Use o UDFTransformer do SynapseML para encapsular as UDFs em transformadores compatíveis com pipelines:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

Executar o pipeline de atributos

Aplique os dois transformadores e crie uma coluna de rótulo binário a partir da classificação:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

Verifique a extração de atributos:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

Classificar usando pyspark

Para escolher o melhor classificador logisticRegression usando a pyspark biblioteca, você deve executar explicitamente estas etapas:

  1. Processe os recursos:
    • Faça a tokenização da coluna de texto.
    • Transforme a coluna tokenizada em um vetor usando hashing.
    • Mesclar os recursos numéricos com o vetor.
  2. Converta a coluna de rótulos do tipo booleano para o tipo inteiro.
  3. Treine vários algoritmos de LogisticRegression no train conjunto de dados com hiperparâmetros diferentes.
  4. Compute a área sob a curva ROC (AUC) para cada modelo treinado e selecione o modelo com a métrica mais alta no test conjunto de dados.
  5. Avalie o melhor modelo no conjunto validation.

Destacar e preparar os dados

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

Verifique os dados em destaque:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

Treinar e avaliar modelos

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

Verifique os resultados:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

Note

Os valores AUC exatos dependem da divisão aleatória. Espere valores entre 0,65 e 0,85.

Classificar usando SynapseML

A synapseml abordagem obtém o mesmo resultado com menos etapas. O SynapseML cuida internamente da engenharia de atributos, o que reduz o código que você precisa escrever:

  1. O estimador TrainClassifier faz a extração de características dos dados internamente, desde que as colunas nos conjuntos de dados train, test e validation representem as características.
  2. O FindBestModel avaliador localiza o melhor modelo de um pool de modelos treinados avaliando o test desempenho no conjunto de dados com a métrica especificada.
  3. O ComputeModelStatistics transformador calcula várias métricas em um conjunto de dados pontuado (nesse caso, o validation conjunto de dados) ao mesmo tempo.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Verifique os resultados do SynapseML:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

Note

As abordagens pyspark e SynapseML devem produzir valores AUC semelhantes, pois treinam o mesmo tipo de modelo com os mesmos hiperparâmetros nos mesmos dados.

Comparar as duas abordagens

Aspect pyspark SynapseML
Processamento de funcionalidades Manual (do Tokenizer ao HashingTF ao VectorAssembler) Automático (manipulado por TrainClassifier)
Seleção de modelo Loop manual com avaliador Integrado FindBestModel
Computação de métricas Métrica única por chamada de avaliação Várias métricas com ComputeModelStatistics
Linhas de código Cerca de 30 linhas Aproximadamente 15 linhas
Resultado Mesma AUC Mesma AUC

Solução de problemas

Questão Cause Resolução
AnalysisException: Path does not exist A URL pública de armazenamento de blobs está temporariamente indisponível Aguarde alguns minutos e tente novamente. Verificar a conectividade executando spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count()
IllegalArgumentException: Field "features" does not exist Os nomes das colunas de recursos não correspondem entre transformadores Verificar nomes de colunas executando data.columns antes da etapa VectorAssembler
NameError: name 'LogisticRegression' is not defined Declaração de importação ausente Adicionar from pyspark.ml.classification import LogisticRegression na parte superior da célula
ModuleNotFoundError: No module named 'synapse.ml' O notebook não está usando Fabric runtime do Spark Verifique se o notebook usa Fabric Runtime 1.2 ou posterior. Selecione Ambiente na faixa de opções para verificar.
AUC baixo (abaixo de 0,6) Problema na divisão dos dados ou problemas de convergência Verifique a distribuição dos rótulos com data.groupBy("label").count().show(). Espere um conjunto de dados aproximadamente equilibrado.
Py4JJavaError: An error occurred while calling erro interno do Java/Spark Verifique a interface do usuário do Spark para obter logs de erros detalhados. Reinicie a sessão do Spark selecionando Session>Stop session e, em seguida, execute novamente todas as células.

Limpar os recursos

Se você criou um novo lakehouse para este artigo e não precisa mais dele:

  1. Em seu workspace, clique com o botão direito do mouse no nome do lakehouse.
  2. Selecione Excluir.
  3. Confirme a exclusão.

O bloco de anotações permanece em seu workspace, a menos que você o exclua separadamente.