Freigeben über


Erstellen eines Machine Learning-Modells mit Apache Spark MLlib

In diesem Artikel erfahren Sie, wie Sie mithilfe von Apache Spark MLlib eine Machine-Learning-Anwendung erstellen, die eine einfache Vorhersageanalyse für ein Azure Open Dataset ausführt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird eine Klassifizierung mittels logistischer Regression verwendet.

Die SparkML und MLlib Spark-Kernbibliotheken bieten viele Dienstprogramme, die für Machine Learning Aufgaben nützlich sind. Diese Dienstprogramme eignen sich für:

  • Klassifizierung
  • Clusterbildung
  • Testen von Hypothesen und Berechnen von Beispielstatistiken
  • Regression
  • Singulärwertzerlegung (Singular Value Decomposition, SVD) und Hauptkomponentenanalyse (Principal Component Analysis, PCA)
  • Themenmodellierung

Grundlegendes zu Klassifizierung und logistischer Regression

Bei der Klassifizierung, einer beliebten Aufgabe des Machine Learning, werden die Eingabedaten in Kategorien sortiert. Ein Klassifizierungsalgorithmus sollte herausfinden, wie die bereitgestellten Eingabedaten Bezeichnungen zugewiesen werden. So könnte ein Machine Learning-Algorithmus beispielsweise Börsendaten als Eingabe akzeptieren und die Daten in zwei Kategorien einteilen: Aktien, die Sie verkaufen sollten, und solche, die Sie behalten sollten.

Der Algorithmus Logistische Regression ist nützlich für die Klassifizierung. Die API für die logistische Regression von Spark ist nützlich für eine binäre Klassifizierung der Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.

Die logistische Regression erzeugt eine logistische Funktion, die die Wahrscheinlichkeit vorhersagen kann, dass ein Eingabevektor zu einer Gruppe gehört.

Beispiel für eine Vorhersageanalyse von NYC-Taxidaten

Die Daten sind über die Ressource Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu Taxifahrten von Yellow Cabs, einschließlich Informationen zu den Start- und Endzeiten, den Start- und Zielorten, den Fahrtkosten und anderer Attribute.

Der Rest dieses Artikels stützt sich auf Apache Spark, um einige Analysen der NYC-Taxi-Trinkgelddaten durchzuführen und dann ein Modell zu entwickeln, um vorherzusagen, ob eine bestimmte Reise ein Trinkgeld enthält oder nicht.

Erstellen eines Apache Spark-Machine Learning-Modells

  1. Erstellen Sie ein PySpark-Notebook. Weitere Informationen finden Sie unter Erstellen eines Notizbuchs.

  2. Importieren Sie die für dieses Notebook erforderlichen Typen.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Verwenden Sie MLflow , um Ihre Machine Learning-Experimente und die entsprechenden Läufe nachzuverfolgen. Wenn die automatische Protokollierung von Microsoft Fabric aktiviert ist, werden die entsprechenden Metriken und Parameter automatisch erfasst.

    import mlflow
    

Erstellen des Eingabedatenrahmens

In diesem Beispiel werden die Daten in einen Pandas-Datenrahmen geladen und dann in einen Apache Spark-Datenrahmen konvertiert. In diesem Format können Sie andere Apache Spark-Vorgänge anwenden, um das Dataset zu bereinigen und zu filtern.

  1. Fügen Sie diese Zeilen in eine neue Zelle ein, und führen Sie sie aus, um einen Spark DataFrame zu erstellen. In diesem Schritt werden die Daten direkt aus dem Azure Open Datasets-Speicher abgerufen. Sie können diese Daten nach unten filtern, um ein bestimmtes Datenfenster zu untersuchen. Im Codebeispiel wird ein Filter verwendet, der einen einzelnen Monat mit Daten zurückgibt.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-06")) \
        .repartition(20)
    
  2. Dieser Code reduziert das Dataset auf etwa 10.000 Zeilen. Um die Entwicklung und Schulung zu beschleunigen, werden die Codebeispiele durch Verkleinerung des Datasets optimiert.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Sehen Sie sich die Daten mithilfe des integrierten display() Befehls an. Mit diesem Befehl können Sie ganz einfach ein Datenbeispiel anzeigen oder Trends in den Daten grafisch untersuchen.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Vorbereiten der Daten

Die Datenaufbereitung ist ein wichtiger Schritt im Machine Learning-Prozess. Sie umfasst das Bereinigen, die Transformation und Organisation von Rohdaten, um sie für die Analyse und Modellierung geeignet zu machen. In diesem Codebeispiel führen Sie mehrere Datenaufbereitungsschritte aus:

  • Filtern des Datensatzes zum Entfernen von Ausreißern und falschen Werten
  • Entfernen von Spalten, die für das Modelltraining nicht benötigt werden
  • Erstellen neuer Spalten aus den Rohdaten
  • Erstellen Sie ein Label, um festzustellen, ob eine bestimmte Taxifahrt ein Trinkgeld beinhaltet.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Führen Sie anschließend einen zweiten Durchlauf über die Daten durch, um die endgültigen Merkmale hinzuzufügen.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Erstellen eines logistischen Regressionsmodells

Die letzte Aufgabe besteht darin, die Daten mit Label in ein Format zu konvertieren, das die logistische Regression verarbeiten kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Struktur von Paaren aus Bezeichnung und Featurevektor aufweisen, wobei der Featurevektor aus Zahlen besteht, die den Eingabepunkt darstellen.

Basierend auf den endgültigen Aufgabenanforderungen müssen Sie die kategorisierten Spalten in Zahlen konvertieren. Konvertieren Sie die Spalten trafficTimeBins und weekdayString insbesondere in ganzzahlige Darstellungen. Für diese Anforderung stehen viele Optionen zur Verfügung. In diesem Beispiel wird der OneHotEncoder Ansatz berücksichtigt:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Diese Aktion führt zu einem neuen Datenrahmen mit allen Spalten im richtigen Format, um ein Modell zu trainieren.

Trainieren eines logistischen Regressionsmodells

Die erste Aufgabe teilt das Dataset in ein Trainingsdataset und ein Test- oder Validierungsdataset auf.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nachdem Sie über zwei DataFrames verfügen, erstellen Sie die Modellformel, und führen Sie sie mit dem Schulungsdatenframe aus. Validieren Sie anschließend mit dem Test-DataFrame. Experimentieren Sie mit verschiedenen Versionen der Modellformel, um die Auswirkungen verschiedener Kombinationen zu ermitteln.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area under ROC = %s" % auc)

Die Zellenausgabe:

Area under ROC = 0.9749430523917996

Erstellen einer visuellen Darstellung der Vorhersage

Erstellen Sie eine endgültige Visualisierung, um die Modellergebnisse zu interpretieren. Eine ROC-Kurve kann das Ergebnis darstellen.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Diagramm: ROC-Kurve für die logistische Regression im Trinkgeldmodell