Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Neste artigo, você aprenderá a usar o Apache Spark MLlib para criar um aplicativo de machine learning que lida com a análise preditiva em um conjunto de dados aberto Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa classificação por meio de regressão logística.
Este tutorial aborda estas etapas:
- Configurar o notebook e as importações
- Carregar e amostrar dados de táxi de NYC
- Preparar e desenvolver funcionalidades
- Codificar atributos categóricos
- Treinar modelo de regressão logística
- Avaliar e visualizar resultados
As bibliotecas básicas do Spark SparkML e MLlib fornecem muitos utilitários úteis para tarefas de aprendizado de máquina. Estes utilitários são adequados para:
- Classificação
- Agrupamento
- Teste de hipótese e cálculo de estatísticas de exemplo
- Regressão
- Decomposição de valor singular (SVD) e análise de componente principal (PCA)
- Modelagem de tópico
Pré-requisitos
Obtenha uma assinatura do Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita Microsoft Fabric.
Faça login em Microsoft Fabric.
Alterne para o Fabric usando o alternador de experiência no lado inferior esquerdo da sua página inicial.
- Se necessário, crie uma Microsoft Fabric lakehouse, conforme descrito em Criar uma lakehouse em Microsoft Fabric.
- Crie um novo caderno de notas em seu espaço de trabalho selecionando + e, em seguida, Notebook. Para obter mais informações, consulte Criar um bloco de anotações.
Compreender a classificação e regressão logística
Classificação, uma tarefa popular de aprendizado de máquina que envolve a classificação de dados de entrada em categorias. Um algoritmo de classificação descobre como atribuir rótulos aos dados de entrada fornecidos. Por exemplo, um algoritmo de machine learning pode aceitar informações de ações como entrada e dividir as ações em duas categorias: ações que você deve vender e ações que você deve manter.
O algoritmo de regressão logística é útil para classificação. A API de regressão logística do Spark é útil para classificação binária de dados de entrada em um dos dois grupos. Para obter mais informações sobre a regressão logística, confira a Wikipédia.
A regressão logística produz uma função logística que prevê a probabilidade de que um vetor de entrada pertença a um grupo ou outro.
Exemplo de análise preditiva de dados de táxi de Nova York
Os dados estão disponíveis por meio do recurso Azure Open Datasets. Esse subconjunto do conjunto de dados hospeda informações sobre as corridas de táxi amarelo, incluindo as horas de início e de término, locais de partida e destino e o custo das corridas.
Este tutorial usa o Apache Spark para realizar análises nos dados de gorjetas de corridas de táxi de Nova York e desenvolver um modelo para prever se uma corrida específica inclui gorjeta.
Criar um modelo de machine learning do Apache Spark
Crie um notebook do PySpark. Para obter mais informações, consulte Criar um bloco de anotações.
Depois de criar o notebook, vincule-o a um lakehouse selecionando Adicionar lakehouse no painel esquerdo.
Importe os tipos necessários para este notebook. Cole o código a seguir na primeira célula e execute-o.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorVerifique: A célula é finalizada sem
ImportError. Se você vir um erro, confirme se o notebook usa o runtime do PySpark.Use o MLflow para acompanhar os experimentos de machine learning e as execuções correspondentes. Se o Log Automático do Microsoft Fabric estiver habilitado, as métricas e os parâmetros correspondentes serão capturados automaticamente.
import mlflowVerifique: A célula é executada sem erros. Execute
print(mlflow.__version__)para confirmar se o MLflow está disponível.
Construir o DataFrame de entrada
Este exemplo carrega os dados do armazenamento do Azure Open Datasets para um DataFrame do Apache Spark. Em seguida, você aplica operações do Spark para limpar e filtrar o conjunto de dados.
Cole o código a seguir em uma nova célula e execute-o para criar um DataFrame do Spark. Esta etapa recupera dados de táxi amarelo de NYC filtrados para maio de 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Verificar: execute a célula a seguir para confirmar se os dados são carregados com êxito.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsFaça uma amostragem do conjunto de dados para acelerar o desenvolvimento e o treinamento.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Verifique: confirme se o tamanho da amostra é gerenciável.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsExiba os dados usando o comando interno
display()para explorar o exemplo de dados.display(sampled_taxi_df.limit(10))Verificar: uma tabela com 10 linhas aparece mostrando colunas como
tpepPickupDateTime,fareAmountetipAmounttripDistance.
Preparar os dados
A preparação de dados é uma etapa crucial no processo de aprendizado de máquina. Envolve a limpeza, a transformação e a organização de dados brutos para torná-los adequados para análise e modelagem. Nesta seção, execute várias etapas de preparação de dados:
- Filtre o conjunto de dados para remover exceções e valores incorretos.
- Remova colunas que não são necessárias para treinamento de modelo.
- Crie novas colunas com base nos dados brutos.
- Gere um rótulo para determinar se uma viagem de táxi específica envolve uma gorjeta.
Execute o código a seguir para selecionar colunas relevantes, recursos derivados de computação e filtrar exceções:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Importante
A date_format função usa o padrão 'HH' (formato de 24 horas, valores 0-23) em vez de 'hh' (formato de 12 horas, valores 1 a 12). O formato de 24 horas é necessário para a lógica de agrupamento por faixas de horário a seguir.
Em seguida, adicione o recurso de intervalos de tempo de tráfego com base na hora do dia:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Verifique: confirme se os intervalos de tempo do tráfego estão distribuídos corretamente.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Criar um modelo de regressão logística
A tarefa final é converter os dados rotulados para um formato que possa ser analisado pela regressão logística. A entrada para um algoritmo de regressão logística precisa ser uma estrutura de pares de rótulo/vetor de características, em que o vetor de características é um vetor de números que representa o ponto de entrada.
Converta as colunas categóricas trafficTimeBins e weekdayString em representações de inteiro usando a OneHotEncoder abordagem:
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Verifique: confirme se o DataFrame codificado tem as novas colunas esperadas.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Treinar um modelo de regressão logística
Divida o conjunto de dados em um conjunto de treinamento (70%) e um conjunto de testes (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Verifique: Confirme se a divisão produziu tamanhos razoáveis.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Crie a fórmula do modelo, treine o modelo de regressão logística e avalie-o utilizando a área sob a curva ROC (Receiver Operating Characteristic):
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Verificar: a saída mostra um valor AUC. Um modelo de bom desempenho produz um valor próximo a 1,0.
Area under ROC = 0.97 (approximately)
Note
O valor exato do AUC varia dependendo do exemplo de dados. Valores acima de 0,90 indicam um forte desempenho preditivo para esse conjunto de dados.
Criar uma representação visual da previsão
Crie uma visualização final para interpretar os resultados do modelo. Uma curva ROC apresenta a compensação entre a taxa positiva verdadeira e a taxa falsa positiva.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Verificar: um gráfico é exibido mostrando a curva ROC acima da linha diagonal tracejada vermelha. A curva deve se curvar em direção ao canto superior esquerdo, indicando um forte desempenho de classificação.
Limpar os recursos
Depois de concluir este tutorial, exclua o notebook e o lakehouse para liberar capacidade do workspace:
- No seu espaço de trabalho, clique com o botão direito do mouse no notebook e selecione Excluir.
- Se você criou um lakehouse especificamente para este tutorial, clique com o botão direito do mouse nele e selecione Excluir.
Para preservar o modelo treinado para uso futuro, adicione o seguinte código antes da limpeza:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Solução de problemas
| Questão | Cause | Solução |
|---|---|---|
Py4JJavaError ao ler Parquet |
Conectividade de rede para o Armazenamento de Blobs do Azure | Verifique se o workspace Fabric tem acesso à Internet de saída. Tente reiniciar a sessão do Spark. |
AnalysisException: cannot resolve column |
Erro de digitação no nome da coluna ou incompatibilidade de esquema | Execute nyc_tlc_df.printSchema() para inspecionar as colunas disponíveis. O esquema do conjunto de dados de táxi de NYC pode mudar entre anos. |
| DataFrame vazio após a filtragem | Condições de filtro muito restritivas para a janela de dados | Aumente o intervalo de datas ou verifique sampled_taxi_df.count() antes da filtragem. |
IllegalArgumentException em StringIndexer |
Rótulos não vistos durante a transformação | Adicione handleInvalid="skip" às suas StringIndexer chamadas: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| AUC baixo (abaixo de 0,6) | Dados insuficientes ou engenharia de recursos incorreta | Aumente a fração de exemplo (por exemplo, 0.01 em vez de 0.001) e verifique se trafficTimeBins as categorias estão equilibradas. |
OutOfMemoryError |
Conjunto de dados muito grande para capacidade disponível | Reduza a fração de amostragem ou aumente o nível de capacidade do Fabric. |
| Gráfico ROC não exibido | Problema de backend do Matplotlib no notebook | Adicione %matplotlib inline na parte superior do bloco de anotações. |
Conteúdo relacionado
- Use exemplos de IA para criar modelos de machine learning: Usar exemplos de IA
- Acompanhe as execuções de aprendizado de máquina usando os Experimentos: Experimentos de aprendizado de máquina