Migrera från SparkR till sparklyr

SparkR utvecklades som en del av Apache Spark och dess design är bekant för användare av Scala och Python, men potentiellt mindre intuitivt för R-utövare. Dessutom är SparkR inaktuellt i Spark 4.0.

Sparklyr fokuserar däremot på att ge en mer R-vänlig upplevelse. Den använder dplyr syntax, som är bekant för användare av tidyverse med mönster som select(), filter()och mutate() för DataFrame-åtgärder.

sparklyr är det rekommenderade R-paketet för att arbeta med Apache Spark. Den här sidan förklarar skillnaderna mellan SparkR och sparklyr mellan Spark-API:er och innehåller information om kodmigrering.

Miljöinställningar

Installation

Om du befinner dig på Azure Databricks-arbetsytan krävs ingen installation. Läs in sparklyr med library(sparklyr). Information om hur du installerar sparklyr lokalt utanför Azure Databricks finns i Komma igång.

Ansluta till Spark

Anslut till Spark med sparklyr på Databricks-arbetsytan eller lokalt med Databricks Connect:

Arbetsyta:

library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect:

sc <- spark_connect(method = "databricks_connect")

Mer information och en utökad självstudie om Databricks Connect med sparklyr finns i Komma igång.

Läsa och skriva data

sparklyr har en serie av spark_read_*() och spark_write_*() funktioner för att läsa in och spara data, till skillnad från SparkR:s generiska read.df() och write.df() funktioner. Det finns också unika funktioner för att skapa tillfälliga Spark DataFrames- eller Spark SQL-vyer från R-dataramar i minnet.

Uppgift SparkR sparklyr
Kopiera data till Spark createDataFrame() copy_to()
Skapa tillfällig vy createOrReplaceTempView() Använd invoke() direkt med metoden
Skriva data till tabell saveAsTable() spark_write_table()
Skriva data till ett angivet format write.df() spark_write_<format>()
Läs data från tabellen tableToDF() tbl() eller spark_read_table()
Läsa data från ett angivet format read.df() spark_read_<format>()

Läser in data

Om du vill konvertera en R-dataram till en Spark DataFrame eller skapa en tillfällig vy från en DataFrame för att tillämpa SQL på den:

SparkR

mtcars_df <- createDataFrame(mtcars)

sparklyr

mtcars_tbl <- copy_to(
  sc,
  df = mtcars,
  name = "mtcars_tmp",
  overwrite = TRUE,
  memory = FALSE
)

copy_to() skapar en tillfällig vy med det angivna namnet. Du kan använda namn för att referera till data om du använder SQL direkt (till exempel sdf_sql()). copy_to() Cachelagrar också data genom att ange parametern memory till TRUE.

Skapa vyer

Följande kodexempel visar hur tillfälliga vyer skapas:

SparkR

createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

sparklyr

spark_dataframe(mtcars_tbl) |>
  invoke("createOrReplaceTempView", "mtcars_tmp_view")

Att skriva in data

Följande kodexempel visar hur data skrivs:

SparkR

# Save a DataFrame to Unity Catalog
saveAsTable(
  mtcars_df,
  tableName = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
  mtcars_df,
  path = "file:/<path/to/save/delta/mtcars>",
  source = "delta",
  mode = "overwrite"
)

sparklyr

# Save tbl_spark to Unity Catalog
spark_write_table(
  mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
  mtcars_tbl,
  path = "file:/<path/to/save/delta/mtcars>",
  mode = "overwrite"
)

# Use DBI
library(DBI)
dbWriteTable(
  sc,
  value = mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  overwrite = TRUE
)

Läsa data

Följande kodexempel visar hur data läse:

SparkR

# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
  path = "file:/<path/to/read/csv/data.csv>",
  source = "csv",
  header = TRUE,
  inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
  path = "file:/<path/to/read/delta/mtcars>",
  source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

sparklyr

# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")

# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))

# Load csv from local filesystem as tbl_spark
spark_read_csv(
  sc,
  name = "mtcars_csv",
  path = "file:/<path/to/csv/mtcars>",
  header = TRUE,
  infer_schema = TRUE
)

# Load delta from local filesystem as tbl_spark
spark_read_delta(
  sc,
  name = "mtcars_delta",
  path = "file:/tmp/test/sparklyr1"
)

# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")

Bearbeta data

Välj och filtrera

SparkR

# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

sparklyr

# Select specific columns
mtcars_tbl |>
  select(mpg, cyl, hp)

# Filter rows where mpg > 20
mtcars_tbl |>
  filter(mpg > 20)

Lägg till kolumner

SparkR

# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

sparklyr

# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
  mutate(power_to_weight = hp / wt)

Gruppering och sammansättning

SparkR

# Calculate average mpg and hp by number of cylinders
mtcars_df |>
  groupBy("cyl") |>
  summarize(
    avg_mpg = avg(mtcars_df$mpg),
    avg_hp = avg(mtcars_df$hp)
  )

sparklyr

# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
  group_by(cyl) |>
  summarize(
    avg_mpg = mean(mpg),
    avg_hp = mean(hp)
  )

Ansluter sig

Anta att vi har en annan datauppsättning med cylinderetiketter som vi vill koppla till mtcars.

SparkR

# Create another DataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
  x = mtcars_df,
  y = cylinders_df,
  mtcars_df$cyl == cylinders_df$cyl,
  joinType = "inner"
)

sparklyr

# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)

# join mtcars_df with cylinders_tbl
mtcars_tbl |>
  inner_join(cylinders_tbl, by = join_by(cyl))

Användardefinierade funktioner (UDF:er)

Så här skapar du en anpassad funktion för kategorisering:

# Define the custom function
categorize_hp <- function(df)
  df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
  df

SparkR

SparkR kräver att utdataschemat definieras explicit innan en funktion tillämpas:

# Define the schema for the output DataFrame
schema <- structType(
  structField("mpg", "double"),
  structField("cyl", "double"),
  structField("disp", "double"),
  structField("hp", "double"),
  structField("drat", "double"),
  structField("wt", "double"),
  structField("qsec", "double"),
  structField("vs", "double"),
  structField("am", "double"),
  structField("gear", "double"),
  structField("carb", "double"),
  structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
  mtcars_df,
  func = categorize_hp,
  schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
  mtcars_df,
  cols = "hp",
  func = categorize_hp,
  schema = schema
)

sparklyr

# Load Arrow to avoid cryptic errors
library(arrow)

# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
  spark_apply(f = categorize_hp)

# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
  spark_apply(
    f = summary,
    group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
  )

spark.lapply() vs spark_apply()

I SparkR spark.lapply() körs på R-listor i stället för DataFrames. Det finns ingen direkt motsvarighet i sparklyr, men du kan uppnå liknande beteende med spark_apply() genom att arbeta med en DataFrame som innehåller unika identifierare och gruppering av dessa ID:n. I vissa fall kan radvisa åtgärder också ge jämförbara funktioner. Mer information om spark_apply()finns i Distribuera R-beräkningar.

SparkR

# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
  x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

sparklyr

# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)

# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)

# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")

# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
  df |>
    dplyr::rowwise() |>
    dplyr::mutate(x = id * 2)

spark_apply(sdf, f = row_func)

Maskininlärning

Fullständiga SparkR- och sparklyr-exempel för maskininlärning finns i Spark ML-guiden och sparklyr-referensen.

Anmärkning

Om du inte använder Spark MLlib rekommenderar Databricks att du använder UDF:er för att träna med det bibliotek du väljer (till exempel xgboost).

Linjär regression

SparkR

# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

sparklyr

# Select features
training_tbl <- mtcars_tbl |>
  select(mpg, hp, wt)

# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
  ml_linear_regression(response = "mpg", features = c("hp", "wt"))

# View model summary
summary(linear_model)

K-means-klusteranalys

SparkR

# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

sparklyr

# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
  select(mpg, hp)

# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
  ft_vector_assembler(
    input_cols = c("mpg", "hp"),
    output_col = "features"
  )

# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
  ml_kmeans(features_col = "features", k = 3)

# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)

Prestanda och optimering

Samla

Både SparkR och sparklyr använder collect() för att konvertera Spark DataFrames till R-dataramar. Samla bara in små mängder data tillbaka till R-dataramar, annars får Spark-drivrutinen slut på minne.

För att förhindra minnesfel har SparkR inbyggda optimeringar i Databricks Runtime som hjälper till att samla in data eller köra användardefinierade funktioner.

För att säkerställa optimala prestanda med sparklyr för insamling av data och UDF:er på Databricks Runtime-versioner under 14.3 LTS läser du in arrow paketet:

library(arrow)

Partitionering i minnet

SparkR

# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

sparklyr

# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")

# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)

# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)

# Get number of partitions
sdf_num_partitions(mtcars_tbl)

Cache

SparkR

# Cache the DataFrame in memory
cache(mtcars_df)

sparklyr

# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")