Del via


Bygg en maskinlæringsmodell med Apache Spark MLlib

I denne artikkelen lærer du hvordan du bruker Apache Spark MLlib til å opprette et maskinlæringsprogram som håndterer enkel prediktiv analyse på et Azure-åpent datasett. Spark gir innebygde maskinlæringsbiblioteker. Dette eksemplet bruker klassifisering gjennom logistisk regresjon.

Kjernebibliotekene SparkML og MLlib Spark gir mange verktøy som er nyttige for maskinlæringsoppgaver. Disse verktøyene er egnet for:

  • Klassifisering
  • Klynging
  • Hypotesetesting og beregning av eksempelstatistikk
  • Regresjon
  • Entallsverdikomponering (SVD) og hovedkomponentanalyse (PCA)
  • Emnemodellering

Forstå klassifisering og logistisk regresjon

Klassifisering, en populær maskinlæringsoppgave, innebærer sortering av inndata i kategorier. En klassifiseringsalgoritme bør finne ut hvordan du tilordner etiketter til de angitte inndata. En maskinlæringsalgoritme kan for eksempel godta aksjeinformasjon som inndata, og dele aksjen inn i to kategorier: aksjer som du bør selge og aksjer som du bør beholde.

Algoritmen for logistikkregresjon er nyttig for klassifisering. Spark-logistikkregresjons-API-en er nyttig for binær klassifisering av inndata i én av to grupper. Hvis du vil ha mer informasjon om logistisk regresjon, kan du se Wikipedia.

Logistisk regresjon produserer en logistisk funksjon som kan forutsi sannsynligheten for at en inndatavektor tilhører én gruppe eller den andre.

Eksempel på prediktiv analyse av NYC-taxidata

Dataene er tilgjengelige via Azure Open Datasets-ressursen. Dette delsettet for datasettet inneholder informasjon om gule taxiturer, inkludert starttider, sluttidspunkt, startsteder, sluttsteder, reisekostnader og andre attributter.

Resten av denne artikkelen er avhengig av Apache Spark for først å utføre noen analyser på NYC taxi-trip tips data og deretter utvikle en modell for å forutsi om en bestemt tur inkluderer et tips eller ikke.

Opprette en Apache Spark-maskinlæringsmodell

  1. Opprett en PySpark-notatblokk. Hvis du vil ha mer informasjon, kan du gå til Opprett en notatblokk.

  2. Importer typene som kreves for denne notatblokken.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Bruk MLflow til å spore maskinlæringseksperimentene dine og tilhørende kjøringer. Hvis Microsoft Fabric Autologging er aktivert, registreres de tilsvarende måledataene og parameterne automatisk.

    import mlflow
    

Konstruere datarammen for inndata

Dette eksemplet laster inn dataene i en Pandas-dataramme, og konverterer dem deretter til en Apache Spark-dataramme. I det formatet kan du bruke andre Apache Spark-operasjoner for å rense og filtrere datasettet.

  1. Lim inn disse linjene i en ny celle, og kjør dem for å opprette en Spark DataFrame. Dette steget henter dataene direkte fra Azure Open Datasets lagring. Du kan filtrere ned disse dataene for å undersøke et spesifikt datavindu. Kodeeksempelet bruker et filter som returnerer én måneds data.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-06")) \
        .repartition(20)
    
  2. Denne koden reduserer datasettet til ca. 10 000 rader. For å fremskynde utviklingen og treningen, tar koden eksempler fra datasettet for nå.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Se på dataene ved hjelp av den innebygde display() kommandoen. Med denne kommandoen kan du enkelt se et datasample, eller grafisk utforske trender i dataene.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Klargjøre dataene

Dataforberedelse er et viktig trinn i maskinlæringsprosessen. Det innebærer rengjøring, transformasjon og organisering av rådata, for å gjøre det egnet for analyse og modellering. I dette kodeeksempelet utfører du flere trinn for klargjøring av data:

  • Filtrer datasettet for å fjerne ytterpunkter og uriktige verdier
  • Fjerne kolonner som ikke er nødvendige for modellopplæring
  • Opprette nye kolonner fra rådataene
  • Generer en etikett for å finne ut om en gitt taxitur innebærer et tips
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Deretter foretar du en ny overføring over dataene for å legge til de endelige funksjonene.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opprette en logistisk regresjonsmodell

Den endelige oppgaven konverterer de merkede dataene til et format som logistikkregresjon kan håndtere. Inndataene til en logistisk regresjonsalgoritme må ha en struktur for etikett/funksjonsvektorpar, der funksjonsvektoren er en vektor av tall som representerer inndatapunktet.

Basert på de endelige oppgavekravene må du konvertere de kategoriske kolonnene til tall. Spesifikt, konverter trafficTimeBins og weekdayString kolonnene til heltallsrepresentasjoner. Mange alternativer finnes for å håndtere dette behovet. Dette eksemplet OneHotEncoder omfatter fremgangsmåten:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handlingen resulterer i en ny DataFrame med alle kolonner i riktig format for å lære opp en modell.

Lære opp en logistisk regresjonsmodell

Den første oppgaven deler datasettet inn i et opplæringssett, og et test- eller valideringssett.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Når du har to DataFrames, opprett modellformelen og kjør den mot treningsdatarammen. Deretter validerer du mot testdatarammen. Eksperimenter med ulike versjoner av modellformelen for å se effektene av ulike kombinasjoner.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area under ROC = %s" % auc)

Celleutdataene:

Area under ROC = 0.9749430523917996

Opprette en visuell representasjon av prognosen

Bygg en endelig visualisering for å tolke modellens resultater. En ROC-kurve kan presentere resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf som viser ROC-kurven for logistisk regresjon i tipsmodellen.