Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo mostra como executar uma tarefa de classificação de texto com dois métodos. Um método usa simples pysparke o outro usa a synapseml biblioteca. Ambos os métodos apresentam o mesmo desempenho, mas destacam como o SynapseML reduz a complexidade do código em comparação com pyspark.
A tarefa prevê se uma revisão do cliente de um livro vendido na Amazon é boa (classificação > 3) ou ruim, com base no texto de revisão. Treine os aprendizes de LogisticRegression com hiperparâmetros diferentes e escolha o melhor modelo.
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita Microsoft Fabric.
Faça login em Microsoft Fabric.
Alterne para o Fabric usando o alternador de experiência no lado inferior esquerdo da sua página inicial.
- Crie um bloco de anotações.
- Anexe seu notebook a um lakehouse. No bloco de anotações, selecione Adicionar no painel esquerdo para anexar uma lakehouse existente ou criar uma nova.
Note
Todas as bibliotecas usadas neste artigo (pyspark, synapseml, numpy) são pré-instaladas no runtime do Fabric Spark. Você não precisa instalar nenhum pacote.
Carregar e explorar os dados
Em Fabric notebooks, uma sessão do Spark já está disponível como a variável spark. Carregue o conjunto de dados de revisões do livro da Amazon de um local de Armazenamento de Blobs do Azure público:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Verifique se o conjunto de dados foi carregado corretamente:
print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")
Extrair características e processar dados
Os dados reais geralmente têm recursos de vários tipos, por exemplo, texto, numérico e categórico. Para demonstrar como trabalhar com tipos de recursos mistos, adicione dois recursos numéricos ao conjunto de dados: a contagem de palavras da revisão e o comprimento médio da palavra.
Definir UDFs (funções definidas pelo usuário)
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np
def calc_word_count(s):
return len(s.split())
def calc_word_length(s):
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())
Aplique UDFs com o UDFTransformer do SynapseML
Use o UDFTransformer do SynapseML para encapsular as UDFs em transformadores compatíveis com pipelines:
from synapse.ml.stages import UDFTransformer
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol="wordCount", udf=wordCountUDF
)
Executar o pipeline de atributos
Aplique os dois transformadores e crie uma coluna de rótulo binário a partir da classificação:
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
Verifique a extração de atributos:
data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")
Classificar usando pyspark
Para escolher o melhor classificador logisticRegression usando a pyspark biblioteca, você deve executar explicitamente estas etapas:
- Processe os recursos:
- Faça a tokenização da coluna de texto.
- Transforme a coluna tokenizada em um vetor usando hashing.
- Mesclar os recursos numéricos com o vetor.
- Converta a coluna de rótulos do tipo booleano para o tipo inteiro.
- Treine vários algoritmos de LogisticRegression no
trainconjunto de dados com hiperparâmetros diferentes. - Compute a área sob a curva ROC (AUC) para cada modelo treinado e selecione o modelo com a métrica mais alta no
testconjunto de dados. - Avalie o melhor modelo no conjunto
validation.
Destacar e preparar os dados
from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType
# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
Verifique os dados em destaque:
print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")
Treinar e avaliar modelos
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Train each model and evaluate on the test set
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")
Verifique os resultados:
print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")
Note
Os valores AUC exatos dependem da divisão aleatória. Espere valores entre 0,65 e 0,85.
Classificar usando SynapseML
A synapseml abordagem obtém o mesmo resultado com menos etapas. O SynapseML cuida internamente da engenharia de atributos, o que reduz o código que você precisa escrever:
- O estimador
TrainClassifierfaz a extração de características dos dados internamente, desde que as colunas nos conjuntos de dadostrain,testevalidationrepresentem as características. - O
FindBestModelavaliador localiza o melhor modelo de um pool de modelos treinados avaliando otestdesempenho no conjunto de dados com a métrica especificada. - O
ComputeModelStatisticstransformador calcula várias métricas em um conjunto de dados pontuado (nesse caso, ovalidationconjunto de dados) ao mesmo tempo.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression
# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)
Verifique os resultados do SynapseML:
auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")
Note
As abordagens pyspark e SynapseML devem produzir valores AUC semelhantes, pois treinam o mesmo tipo de modelo com os mesmos hiperparâmetros nos mesmos dados.
Comparar as duas abordagens
| Aspect | pyspark | SynapseML |
|---|---|---|
| Processamento de funcionalidades | Manual (do Tokenizer ao HashingTF ao VectorAssembler) | Automático (manipulado por TrainClassifier) |
| Seleção de modelo | Loop manual com avaliador | Integrado FindBestModel |
| Computação de métricas | Métrica única por chamada de avaliação | Várias métricas com ComputeModelStatistics |
| Linhas de código | Cerca de 30 linhas | Aproximadamente 15 linhas |
| Resultado | Mesma AUC | Mesma AUC |
Solução de problemas
| Questão | Cause | Resolução |
|---|---|---|
AnalysisException: Path does not exist |
A URL pública de armazenamento de blobs está temporariamente indisponível | Aguarde alguns minutos e tente novamente. Verificar a conectividade executando spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count() |
IllegalArgumentException: Field "features" does not exist |
Os nomes das colunas de recursos não correspondem entre transformadores | Verificar nomes de colunas executando data.columns antes da etapa VectorAssembler |
NameError: name 'LogisticRegression' is not defined |
Declaração de importação ausente | Adicionar from pyspark.ml.classification import LogisticRegression na parte superior da célula |
ModuleNotFoundError: No module named 'synapse.ml' |
O notebook não está usando Fabric runtime do Spark | Verifique se o notebook usa Fabric Runtime 1.2 ou posterior. Selecione Ambiente na faixa de opções para verificar. |
| AUC baixo (abaixo de 0,6) | Problema na divisão dos dados ou problemas de convergência | Verifique a distribuição dos rótulos com data.groupBy("label").count().show(). Espere um conjunto de dados aproximadamente equilibrado. |
Py4JJavaError: An error occurred while calling |
erro interno do Java/Spark | Verifique a interface do usuário do Spark para obter logs de erros detalhados. Reinicie a sessão do Spark selecionando Session>Stop session e, em seguida, execute novamente todas as células. |
Limpar os recursos
Se você criou um novo lakehouse para este artigo e não precisa mais dele:
- Em seu workspace, clique com o botão direito do mouse no nome do lakehouse.
- Selecione Excluir.
- Confirme a exclusão.
O bloco de anotações permanece em seu workspace, a menos que você o exclua separadamente.