Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Neste artigo, aprende a usar o Apache Spark MLlib para criar uma aplicação de aprendizagem automática que gere análise preditiva num conjunto de dados aberto Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa a classificação por meio de regressão logística.
Este tutorial aborda estas etapas:
- Configurar o caderno e importar
- Carregar e amostrar dados de táxis de Nova Iorque
- Preparar e engenhar funcionalidades
- Codificar características categóricas
- Modelo de regressão logística de comboios
- Avaliar e visualizar os resultados
As principais bibliotecas SparkML e MLlib Spark fornecem muitos utilitários que são úteis para tarefas de aprendizado de máquina. Estes utilitários são adequados para:
- Classificação
- Agrupamento
- Teste de hipóteses e cálculo de estatísticas amostrais
- Regressão
- Decomposição de valor singular (SVD) e análise de componentes principais (PCA)
- Modelagem de tópicos
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita do Microsoft Fabric.
Faça login no Microsoft Fabric.
Mude para o Fabric usando o seletor de experiência no canto inferior esquerdo da sua página inicial.
- Se necessário, crie uma casa de lago Microsoft Fabric conforme descrito em Crie uma casa de lago em Microsoft Fabric.
- Crie um novo bloco de notas na sua área de trabalho selecionando + e, em seguida, Bloco de Notas. Para mais informações, consulte Criar um caderno.
Compreender a classificação e a regressão logística
A classificação, uma tarefa popular de aprendizado de máquina, envolve classificar os dados de entrada em categorias. Um algoritmo de classificação descobre como atribuir etiquetas aos dados de entrada fornecidos. Por exemplo, um algoritmo de aprendizagem automática poderia aceitar informação de ações como entrada e dividir a ação em duas categorias: ações que deve vender e ações que deve manter.
O algoritmo de regressão logística é útil para classificação. A API de regressão logística do Spark é útil para a classificação binária de dados de entrada em um dos dois grupos. Para obter mais informações sobre regressão logística, consulte Wikipedia.
A regressão logística produz uma função logística que prevê a probabilidade de um vetor de entrada pertencer a um grupo ou a outro.
Exemplo de análise preditiva de dados de táxi de Nova York
Os dados estão disponíveis através do recurso Azure Open Datasets . Este subconjunto de dados hospeda informações sobre viagens de táxi amarelo, incluindo as horas de início, horas de término, locais de início, locais de término, custos de viagem e outros atributos.
Este tutorial utiliza o Apache Spark para analisar os dados sobre as gorjetas das viagens de táxi em Nova Iorque e desenvolver um modelo para prever se uma determinada viagem inclui gorjeta.
Criar um modelo de aprendizado de máquina do Apache Spark
Crie um bloco de anotações PySpark. Para mais informações, consulte Criar um caderno.
Depois de criar o caderno, anexe-o a uma casa do lago selecionando Adicionar casa do lago no painel esquerdo.
Importa os tipos necessários para este caderno. Cola o código seguinte na primeira célula e executa-o.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorVerificar: A célula é concluída sem
ImportError. Se vires um erro, confirma que o teu portátil usa o runtime do PySpark.Usa o MLflow para acompanhar as tuas experiências de aprendizagem automática e as execuções correspondentes. Se o log automático do Microsoft Fabric estiver habilitado, as métricas e os parâmetros correspondentes serão capturados automaticamente.
import mlflowVerificar: A célula termina sem erros. Execute
print(mlflow.__version__)para confirmar que o MLflow está disponível.
Construir o DataFrame de entrada
Este exemplo carrega os dados do armazenamento do Azure Open Datasets num DataFrame do Apache Spark. Depois, aplica-se as operações do Spark para limpar e filtrar o conjunto de dados.
Cole o código seguinte numa nova célula e execute-a para criar um DataFrame Spark. Esta etapa recupera os dados dos táxis amarelos de Nova Iorque filtrados até maio de 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Verificar: Execute a célula seguinte para confirmar com sucesso as cargas de dados.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsAmostre o conjunto de dados para acelerar o desenvolvimento e o treino.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Verificar: Confirme que o tamanho da amostra é gerível.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsVisualize os dados usando o comando incorporado
display()para explorar a amostra de dados.display(sampled_taxi_df.limit(10))Verificar: Aparece uma tabela com 10 linhas mostrando colunas como
tpepPickupDateTime,fareAmount,tipAmount, etripDistance.
Preparar os dados
A preparação de dados é uma etapa crucial no processo de aprendizado de máquina. Envolve a limpeza, transformação e organização dos dados brutos para os tornar adequados para análise e modelação. Nesta secção, realize vários passos de preparação de dados:
- Filtra o conjunto de dados para remover valores atípicos e incorretos.
- Remove colunas que não são necessárias para treino de modelos.
- Crie novas colunas a partir dos dados brutos.
- Crie uma etiqueta para determinar se uma determinada viagem de táxi inclui uma gorjeta.
Execute o seguinte código para selecionar colunas relevantes, calcular características derivadas e filtrar valores atípicos:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Importante
A date_format função utiliza o padrão 'HH' (formato de 24 horas, valores 0-23) em vez de 'hh' (formato de 12 horas, valores 1-12). O formato de 24 horas é necessário para a lógica subsequente de agrupamento por intervalos horários.
Em seguida, adicione a funcionalidade de intervalos horários de tráfego com base na hora do dia:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Verificar: Confirme que os intervalos de tempo de tráfego estão distribuídos corretamente.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Criar um modelo de regressão logística
A tarefa final converte os dados rotulados em um formato que a regressão logística pode manipular. A entrada para um algoritmo de regressão logística deve ter uma estrutura de pares de vetores rótulo/característica, onde o vetor de características é um vetor de números que representam o ponto de dados de entrada.
Converta as colunas trafficTimeBins categóricas e weekdayString em representações inteiras usando a abordagem OneHotEncoder :
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Verificar: Confirme que o DataFrame codificado tem as colunas novas esperadas.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Treinar um modelo de regressão logística
Divida o conjunto de dados num conjunto de treino (70%) e num conjunto de testes (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Confirme: Confirme que a divisão resultou em tamanhos razoáveis.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Crie a fórmula do modelo, treine o modelo de regressão logística e avalie-o utilizando a Área Sob a Curva ROC (Característica Operacional do Recetor):
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Verificar: O resultado mostra um valor AUC. Um modelo com bom desempenho produz um valor próximo de 1,0.
Area under ROC = 0.97 (approximately)
Note
O valor exato do AUC varia consoante a amostra de dados. Valores acima de 0,90 indicam um forte desempenho preditivo para este conjunto de dados.
Criar uma representação visual da previsão
Constrói uma visualização final para interpretar os resultados do modelo. Uma curva ROC apresenta o equilíbrio entre a taxa de verdadeiros positivos e a taxa de falsos positivos.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Verificar: Aparece um gráfico que mostra a curva ROC acima da linha diagonal vermelha tracejada. A curva deve curvar-se para o canto superior esquerdo, indicando um forte desempenho na classificação.
Limpeza de recursos
Depois de terminares este tutorial, apaga o caderno e o lakehouse para libertar a capacidade do espaço de trabalho:
- No seu espaço de trabalho, clique com o botão direito no caderno e selecione Eliminar.
- Se criaste uma casa no lago especificamente para este tutorial, clica com o botão direito e seleciona Apagar.
Para preservar o modelo treinado para uso futuro, adicione o seguinte código antes da limpeza:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Solução de problemas
| Problema | Causa | Solução |
|---|---|---|
Py4JJavaError ao ler Parquet |
Conectividade de rede ao armazenamento de blobs do Azure | Verifica se o teu espaço de trabalho Fabric tem acesso à internet de saída. Tenta reiniciar a sessão do Spark. |
AnalysisException: cannot resolve column |
Erro de digitação no nome da coluna ou incompatibilidade de esquema | Execute nyc_tlc_df.printSchema() para inspecionar as colunas disponíveis. O esquema do conjunto de dados de táxis de Nova Iorque pode mudar entre anos. |
| DataFrame vazio após filtragem | Condições de filtro demasiado restritivas para a janela de dados | Aumenta o intervalo de datas ou verifica sampled_taxi_df.count() antes de filtrar. |
IllegalArgumentException em StringIndexer |
Etiquetas invisíveis durante a transformação | Adicione handleInvalid="skip" às suas StringIndexer chamadas: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| AUC baixo (abaixo de 0,6) | Dados insuficientes ou engenharia de características incorreta | Aumente a fração amostral (por exemplo, 0.01 em vez de 0.001) e verifique se trafficTimeBins as categorias estão equilibradas. |
OutOfMemoryError |
Conjunto de dados demasiado grande para a capacidade disponível | Reduza a fração da amostra ou aumente o escalão de capacidade do Fabric. |
| Gráfico ROC não apresentado | Problema com o backend do Matplotlib no notebook | Adicione %matplotlib inline no topo do caderno. |
Conteúdos relacionados
- Use exemplos de IA para criar modelos de aprendizado de máquina: use exemplos de IA
- Rastreie execuções de aprendizado de máquina usando Experimentos: Experimentos de aprendizado de máquina