Crie um modelo de aprendizado de máquina com o Apache Spark MLlib

Neste artigo, aprende a usar o Apache Spark MLlib para criar uma aplicação de aprendizagem automática que gere análise preditiva num conjunto de dados aberto Azure. O Spark fornece bibliotecas de aprendizado de máquina integradas. Este exemplo usa a classificação por meio de regressão logística.

Este tutorial aborda estas etapas:

  • Configurar o caderno e importar
  • Carregar e amostrar dados de táxis de Nova Iorque
  • Preparar e engenhar funcionalidades
  • Codificar características categóricas
  • Modelo de regressão logística de comboios
  • Avaliar e visualizar os resultados

As principais bibliotecas SparkML e MLlib Spark fornecem muitos utilitários que são úteis para tarefas de aprendizado de máquina. Estes utilitários são adequados para:

  • Classificação
  • Agrupamento
  • Teste de hipóteses e cálculo de estatísticas amostrais
  • Regressão
  • Decomposição de valor singular (SVD) e análise de componentes principais (PCA)
  • Modelagem de tópicos

Pré-requisitos

Compreender a classificação e a regressão logística

A classificação, uma tarefa popular de aprendizado de máquina, envolve classificar os dados de entrada em categorias. Um algoritmo de classificação descobre como atribuir etiquetas aos dados de entrada fornecidos. Por exemplo, um algoritmo de aprendizagem automática poderia aceitar informação de ações como entrada e dividir a ação em duas categorias: ações que deve vender e ações que deve manter.

O algoritmo de regressão logística é útil para classificação. A API de regressão logística do Spark é útil para a classificação binária de dados de entrada em um dos dois grupos. Para obter mais informações sobre regressão logística, consulte Wikipedia.

A regressão logística produz uma função logística que prevê a probabilidade de um vetor de entrada pertencer a um grupo ou a outro.

Exemplo de análise preditiva de dados de táxi de Nova York

Os dados estão disponíveis através do recurso Azure Open Datasets . Este subconjunto de dados hospeda informações sobre viagens de táxi amarelo, incluindo as horas de início, horas de término, locais de início, locais de término, custos de viagem e outros atributos.

Este tutorial utiliza o Apache Spark para analisar os dados sobre as gorjetas das viagens de táxi em Nova Iorque e desenvolver um modelo para prever se uma determinada viagem inclui gorjeta.

Criar um modelo de aprendizado de máquina do Apache Spark

  1. Crie um bloco de anotações PySpark. Para mais informações, consulte Criar um caderno.

    Depois de criar o caderno, anexe-o a uma casa do lago selecionando Adicionar casa do lago no painel esquerdo.

  2. Importa os tipos necessários para este caderno. Cola o código seguinte na primeira célula e executa-o.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Verificar: A célula é concluída sem ImportError. Se vires um erro, confirma que o teu portátil usa o runtime do PySpark.

  3. Usa o MLflow para acompanhar as tuas experiências de aprendizagem automática e as execuções correspondentes. Se o log automático do Microsoft Fabric estiver habilitado, as métricas e os parâmetros correspondentes serão capturados automaticamente.

    import mlflow
    

    Verificar: A célula termina sem erros. Execute print(mlflow.__version__) para confirmar que o MLflow está disponível.

Construir o DataFrame de entrada

Este exemplo carrega os dados do armazenamento do Azure Open Datasets num DataFrame do Apache Spark. Depois, aplica-se as operações do Spark para limpar e filtrar o conjunto de dados.

  1. Cole o código seguinte numa nova célula e execute-a para criar um DataFrame Spark. Esta etapa recupera os dados dos táxis amarelos de Nova Iorque filtrados até maio de 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Verificar: Execute a célula seguinte para confirmar com sucesso as cargas de dados.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Amostre o conjunto de dados para acelerar o desenvolvimento e o treino.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Verificar: Confirme que o tamanho da amostra é gerível.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Visualize os dados usando o comando incorporado display() para explorar a amostra de dados.

    display(sampled_taxi_df.limit(10))
    

    Verificar: Aparece uma tabela com 10 linhas mostrando colunas como tpepPickupDateTime, fareAmount, tipAmount, e tripDistance.

Preparar os dados

A preparação de dados é uma etapa crucial no processo de aprendizado de máquina. Envolve a limpeza, transformação e organização dos dados brutos para os tornar adequados para análise e modelação. Nesta secção, realize vários passos de preparação de dados:

  • Filtra o conjunto de dados para remover valores atípicos e incorretos.
  • Remove colunas que não são necessárias para treino de modelos.
  • Crie novas colunas a partir dos dados brutos.
  • Crie uma etiqueta para determinar se uma determinada viagem de táxi inclui uma gorjeta.

Execute o seguinte código para selecionar colunas relevantes, calcular características derivadas e filtrar valores atípicos:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Importante

A date_format função utiliza o padrão 'HH' (formato de 24 horas, valores 0-23) em vez de 'hh' (formato de 12 horas, valores 1-12). O formato de 24 horas é necessário para a lógica subsequente de agrupamento por intervalos horários.

Em seguida, adicione a funcionalidade de intervalos horários de tráfego com base na hora do dia:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Verificar: Confirme que os intervalos de tempo de tráfego estão distribuídos corretamente.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Criar um modelo de regressão logística

A tarefa final converte os dados rotulados em um formato que a regressão logística pode manipular. A entrada para um algoritmo de regressão logística deve ter uma estrutura de pares de vetores rótulo/característica, onde o vetor de características é um vetor de números que representam o ponto de dados de entrada.

Converta as colunas trafficTimeBins categóricas e weekdayString em representações inteiras usando a abordagem OneHotEncoder :

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Verificar: Confirme que o DataFrame codificado tem as colunas novas esperadas.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Treinar um modelo de regressão logística

Divida o conjunto de dados num conjunto de treino (70%) e num conjunto de testes (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Confirme: Confirme que a divisão resultou em tamanhos razoáveis.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Crie a fórmula do modelo, treine o modelo de regressão logística e avalie-o utilizando a Área Sob a Curva ROC (Característica Operacional do Recetor):

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Verificar: O resultado mostra um valor AUC. Um modelo com bom desempenho produz um valor próximo de 1,0.

Area under ROC = 0.97 (approximately)

Note

O valor exato do AUC varia consoante a amostra de dados. Valores acima de 0,90 indicam um forte desempenho preditivo para este conjunto de dados.

Criar uma representação visual da previsão

Constrói uma visualização final para interpretar os resultados do modelo. Uma curva ROC apresenta o equilíbrio entre a taxa de verdadeiros positivos e a taxa de falsos positivos.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Verificar: Aparece um gráfico que mostra a curva ROC acima da linha diagonal vermelha tracejada. A curva deve curvar-se para o canto superior esquerdo, indicando um forte desempenho na classificação.

Gráfico que mostra a curva ROC para regressão logística no modelo de ponta.

Limpeza de recursos

Depois de terminares este tutorial, apaga o caderno e o lakehouse para libertar a capacidade do espaço de trabalho:

  1. No seu espaço de trabalho, clique com o botão direito no caderno e selecione Eliminar.
  2. Se criaste uma casa no lago especificamente para este tutorial, clica com o botão direito e seleciona Apagar.

Para preservar o modelo treinado para uso futuro, adicione o seguinte código antes da limpeza:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Solução de problemas

Problema Causa Solução
Py4JJavaError ao ler Parquet Conectividade de rede ao armazenamento de blobs do Azure Verifica se o teu espaço de trabalho Fabric tem acesso à internet de saída. Tenta reiniciar a sessão do Spark.
AnalysisException: cannot resolve column Erro de digitação no nome da coluna ou incompatibilidade de esquema Execute nyc_tlc_df.printSchema() para inspecionar as colunas disponíveis. O esquema do conjunto de dados de táxis de Nova Iorque pode mudar entre anos.
DataFrame vazio após filtragem Condições de filtro demasiado restritivas para a janela de dados Aumenta o intervalo de datas ou verifica sampled_taxi_df.count() antes de filtrar.
IllegalArgumentException em StringIndexer Etiquetas invisíveis durante a transformação Adicione handleInvalid="skip" às suas StringIndexer chamadas: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
AUC baixo (abaixo de 0,6) Dados insuficientes ou engenharia de características incorreta Aumente a fração amostral (por exemplo, 0.01 em vez de 0.001) e verifique se trafficTimeBins as categorias estão equilibradas.
OutOfMemoryError Conjunto de dados demasiado grande para a capacidade disponível Reduza a fração da amostra ou aumente o escalão de capacidade do Fabric.
Gráfico ROC não apresentado Problema com o backend do Matplotlib no notebook Adicione %matplotlib inline no topo do caderno.