Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment utiliser Apache Spark MLlib pour créer une application d’apprentissage automatique qui traite l’analyse prédictive simple sur un jeu de données ouvert Azure. Spark fournit des bibliothèques d’apprentissage automatique intégrées. Cet exemple utilise la classification par régression logistique.
Les bibliothèques Spark de base SparkML et MLlib fournissent de nombreux utilitaires utiles pour les tâches d’apprentissage automatique. Ces utilitaires conviennent pour les tâches suivantes :
- classification
- Regroupement
- Test d'hypothèse et calcul des statistiques d'échantillon
- régression ;
- Décomposition de valeur singulière (SVD) et analyse des composants principaux (PCA)
- Modélisation de rubrique
Comprendre la classification et la régression logistique
Une classification, tâche d’apprentissage automatique très courante, implique le tri de données d’entrée par catégories. Un algorithme de classification doit déterminer comment affecter des étiquettes aux données d’entrée fournies. Par exemple, un algorithme d’apprentissage automatique pourrait accepter des informations de stock en entrée et diviser le stock en deux catégories : le stock à vendre et le stock que vous devriez conserver.
L’algorithme Régression logistique est utile pour la classification. L’API de régression logistique Spark est utile pour la classification binaire des données d’entrée dans l’un des deux groupes. Pour plus d’informations sur la régression logistique, consultez Wikipedia.
La régression logistique génère une fonction logistique qui peut prédire la probabilité qu’un vecteur d’entrée appartienne à un groupe ou à l’autre.
Exemple d’analyse prédictive des données des taxis de New York
Les données sont disponibles via la ressource Azure Open Datasets. Ce sous-ensemble du jeu de données contient des informations sur les courses de taxis jaunes, notamment les heures et les lieux de départ et d’arrivée, le coût des courses et d’autres attributs.
Le reste de cet article s’appuie sur Apache Spark pour effectuer dans un premier temps une analyse sur les données de pourboires des taxis de New York, puis développer un modèle pour prédire si un voyage particulier inclut ou non un pourboire.
Créer un modèle d’apprentissage automatique Apache Spark
Créez un notebook PySpark. Pour plus d’informations, consultez Créer un notebook.
Importez les types nécessaires pour ce notebook.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorUtilisez MLflow pour suivre vos expériences machine learning et les exécutions correspondantes. Si la journalisation automatique Microsoft Fabric est activée, les métriques et paramètres correspondants sont automatiquement capturés.
import mlflow
Construire le DataFrame d’entrée
Cet exemple charge les données dans un dataframe Pandas, puis le convertit en dataframe Apache Spark. Dans ce format, vous pouvez appliquer d’autres opérations Apache Spark pour nettoyer et filtrer le jeu de données.
Collez ces lignes dans une nouvelle cellule et exécutez-les pour créer un DataFrame Spark. Cette étape récupère les données directement à partir du stockage Azure Open Datasets. Vous pouvez filtrer ces données pour examiner une fenêtre spécifique de données. L’exemple de code utilise un filtre qui retourne un mois de données unique.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-06")) \ .repartition(20)Le code réduit le jeu de données à environ 10 000 lignes. Pour accélérer le développement et l’entraînement, le code réduit la taille du jeu de données provisoirement.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)Examinez les données à l'aide de la commande intégrée
display(). Avec cette commande, vous pouvez facilement afficher un exemple de données ou explorer graphiquement les tendances dans les données.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Préparer les données
La préparation des données est une étape cruciale du processus d’apprentissage automatique. Cela passe par le nettoyage, la transformation et l’organisation des données brutes pour les rendre adaptées à l’analyse et à la modélisation. Dans cet exemple de code, plusieurs étapes de préparation des données sont effectuées :
- suppression des valeurs hors norme et des valeurs incorrectes en filtrant le jeu de données ;
- suppression des colonnes non nécessaires à l’entraînement de modèle ;
- création de colonnes à partir des données brutes ; et
- génération d’une étiquette pour déterminer si un trajet donné en taxi implique un pourboire.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Ensuite, effectuez une deuxième passe sur les données pour ajouter les caractéristiques finales.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Créer un modèle de régression logistique
La dernière tâche convertit les données étiquetées dans un format qui peut être analysé par régression logistique. L’entrée dans un algorithme de régression logistique doit avoir une structure de paires de vecteurs étiquette/caractéristique, où le vecteur caractéristique est un vecteur de nombres qui représente le point d’entrée.
En fonction des exigences de tâche finales, vous devez convertir les colonnes catégorielles en nombres. Plus précisément, convertissez les colonnes trafficTimeBins et weekdayString en représentations entières. De nombreuses options sont disponibles pour gérer cette exigence. Cet exemple implique l’approche OneHotEncoder :
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Cette action génère un nouveau DataFrame dont les colonnes sont toutes dans un format adapté à l’entraînement d’un modèle.
Entraîner un modèle de régression logistique
La première tâche divise le jeu de données en un jeu d’entraînement et un jeu de test ou de validation.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Une fois que vous avez deux DataFrames, créez la formule de modèle et exécutez-la sur le DataFrame d’apprentissage. Validez ensuite sur le DataFrame de test. Faites des essais avec différentes versions de la formule du modèle pour voir l’effet de différentes combinaisons.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area under ROC = %s" % auc)
La cellule produit :
Area under ROC = 0.9749430523917996
Créer une représentation visuelle de la prédiction
Créez une visualisation finale pour interpréter les résultats du modèle. Une courbe ROC peut présenter le résultat.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Graphique montrant la courbe ROC pour la régression logistique dans le modèle de pointe.
Contenu connexe
- Utiliser des exemples d’IA pour créer des modèles d’apprentissage automatique : Utiliser des exemples d’IA
- Suivre les exécutions d’apprentissage automatique à l’aide d’Expériences : Expériences d’apprentissage automatique