Migreren van SparkR naar sparklyr

SparkR is ontwikkeld als onderdeel van Apache Spark en het ontwerp is bekend bij gebruikers van Scala en Python, maar mogelijk minder intuïtief voor R-beoefenaars. Daarnaast is SparkR afgeschaft in Spark 4.0.

Sparklyr is daarentegen gericht op het bieden van een meer R-vriendelijke ervaring. Het maakt gebruik van dplyr syntaxis, die bekend is voor gebruikers van tidyverse met patronen zoals select(), filter()en mutate() voor DataFrame-bewerkingen.

sparklyr is het aanbevolen R-pakket voor het werken met Apache Spark. Op deze pagina worden verschillen tussen SparkR en sparklyr in Spark-API's uitgelegd en vindt u informatie over codemigratie.

Omgevingsconfiguratie

Installation

Als u zich in de Azure Databricks-werkruimte bevindt, is er geen installatie vereist. Laad sparklyr met library(sparklyr). Zie Aan de slag als u sparklyr lokaal buiten Azure Databricks wilt installeren.

Verbinding maken met Spark

Verbinding maken met Spark met sparklyr in de Databricks-werkruimte of lokaal via Databricks Connect:

Werkruimte:

library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect:

sc <- spark_connect(method = "databricks_connect")

Zie Aan de slag voor meer informatie en een uitgebreide zelfstudie over Databricks Connect met sparklyr.

Gegevens lezen en schrijven

sparklyr heeft een reeks spark_read_*()- en spark_write_*()-functies om gegevens te laden en op te slaan, in tegenstelling tot de generieke read.df()- en write.df()-functies van SparkR. Er zijn ook unieke functies voor het maken van Spark DataFrames of Spark SQL-tijdelijke weergaven van R-gegevensframes in het geheugen.

Opdracht SparkR sparklyr
Gegevens kopiëren naar Spark createDataFrame() copy_to()
Tijdelijke weergave maken createOrReplaceTempView() Gebruik invoke() rechtstreeks met de methode
Gegevens naar tabel schrijven saveAsTable() spark_write_table()
Gegevens schrijven naar een opgegeven indeling write.df() spark_write_<format>()
Gegevens uit tabel lezen tableToDF() tbl() of spark_read_table()
Gegevens lezen uit een opgegeven indeling read.df() spark_read_<format>()

Gegevens laden

Als u een R-gegevensframe wilt converteren naar een Spark DataFrame, of als u een tijdelijke weergave van een DataFrame wilt maken om er SQL op toe te passen:

SparkR

mtcars_df <- createDataFrame(mtcars)

sparklyr

mtcars_tbl <- copy_to(
  sc,
  df = mtcars,
  name = "mtcars_tmp",
  overwrite = TRUE,
  memory = FALSE
)

copy_to() maakt een tijdelijke weergave met behulp van de opgegeven naam. U kunt de naam gebruiken om te verwijzen naar gegevens als u SQL rechtstreeks gebruikt (bijvoorbeeld sdf_sql()). copy_to() Slaat ook gegevens in de cache op door de memory parameter in te stellen op TRUE.

Weergaven maken

In de volgende codevoorbeelden ziet u hoe tijdelijke weergaven worden gemaakt:

SparkR

createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

sparklyr

spark_dataframe(mtcars_tbl) |>
  invoke("createOrReplaceTempView", "mtcars_tmp_view")

Gegevens schrijven

In de volgende codevoorbeelden ziet u hoe gegevens worden geschreven:

SparkR

# Save a DataFrame to Unity Catalog
saveAsTable(
  mtcars_df,
  tableName = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
  mtcars_df,
  path = "file:/<path/to/save/delta/mtcars>",
  source = "delta",
  mode = "overwrite"
)

sparklyr

# Save tbl_spark to Unity Catalog
spark_write_table(
  mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  mode = "overwrite"
)

# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
  mtcars_tbl,
  path = "file:/<path/to/save/delta/mtcars>",
  mode = "overwrite"
)

# Use DBI
library(DBI)
dbWriteTable(
  sc,
  value = mtcars_tbl,
  name = "<catalog>.<schema>.<table>",
  overwrite = TRUE
)

Lezen van de gegevens

In de volgende codevoorbeelden ziet u hoe gegevens worden gelezen:

SparkR

# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
  path = "file:/<path/to/read/csv/data.csv>",
  source = "csv",
  header = TRUE,
  inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
  path = "file:/<path/to/read/delta/mtcars>",
  source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

sparklyr

# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")

# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))

# Load csv from local filesystem as tbl_spark
spark_read_csv(
  sc,
  name = "mtcars_csv",
  path = "file:/<path/to/csv/mtcars>",
  header = TRUE,
  infer_schema = TRUE
)

# Load delta from local filesystem as tbl_spark
spark_read_delta(
  sc,
  name = "mtcars_delta",
  path = "file:/tmp/test/sparklyr1"
)

# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")

Gegevens verwerken

Selecteren en filteren

SparkR

# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

sparklyr

# Select specific columns
mtcars_tbl |>
  select(mpg, cyl, hp)

# Filter rows where mpg > 20
mtcars_tbl |>
  filter(mpg > 20)

Kolommen toevoegen

SparkR

# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

sparklyr

# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
  mutate(power_to_weight = hp / wt)

Groepering en aggregatie

SparkR

# Calculate average mpg and hp by number of cylinders
mtcars_df |>
  groupBy("cyl") |>
  summarize(
    avg_mpg = avg(mtcars_df$mpg),
    avg_hp = avg(mtcars_df$hp)
  )

sparklyr

# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
  group_by(cyl) |>
  summarize(
    avg_mpg = mean(mpg),
    avg_hp = mean(hp)
  )

[Samenvoegingen]

Stel dat we een andere gegevensset hebben met cilinderlabels die we willen koppelen aan mtcars.

SparkR

# Create another DataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
  x = mtcars_df,
  y = cylinders_df,
  mtcars_df$cyl == cylinders_df$cyl,
  joinType = "inner"
)

sparklyr

# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
  cyl = c(4, 6, 8),
  cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)

# join mtcars_df with cylinders_tbl
mtcars_tbl |>
  inner_join(cylinders_tbl, by = join_by(cyl))

Door de gebruiker gedefinieerde functies (UDF's)

Een aangepaste functie maken voor categorisatie:

# Define the custom function
categorize_hp <- function(df)
  df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
  df

SparkR

SparkR vereist het expliciet definiëren van het uitvoerschema voordat u een functie toepast:

# Define the schema for the output DataFrame
schema <- structType(
  structField("mpg", "double"),
  structField("cyl", "double"),
  structField("disp", "double"),
  structField("hp", "double"),
  structField("drat", "double"),
  structField("wt", "double"),
  structField("qsec", "double"),
  structField("vs", "double"),
  structField("am", "double"),
  structField("gear", "double"),
  structField("carb", "double"),
  structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
  mtcars_df,
  func = categorize_hp,
  schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
  mtcars_df,
  cols = "hp",
  func = categorize_hp,
  schema = schema
)

sparklyr

# Load Arrow to avoid cryptic errors
library(arrow)

# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
  spark_apply(f = categorize_hp)

# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
  spark_apply(
    f = summary,
    group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
  )

spark.lapply() versus spark_apply()

Werkt in SparkR spark.lapply() op R-lijsten in plaats van DataFrames. Er is geen direct equivalent in sparklyr, maar u kunt vergelijkbaar gedrag realiseren door te werken met een DataFrame dat UNIQUE ID's bevat en te groeperen op deze ID's. In sommige gevallen kunnen bewerkingen op rijgewijze wijze ook vergelijkbare functionaliteit bieden. Zie spark_apply() voor meer informatie.

SparkR

# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
  x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

sparklyr

# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)

# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)

# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")

# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
  df |>
    dplyr::rowwise() |>
    dplyr::mutate(x = id * 2)

spark_apply(sdf, f = row_func)

Machinaal leren

Volledige SparkR- en sparklyr-voorbeelden voor machine learning staan in de Spark ML-handleiding en sparklyr-verwijzing.

Opmerking

Als u Spark MLlib niet gebruikt, raadt Databricks u aan om UDF's te gebruiken om te trainen met de bibliotheek van uw keuze (bijvoorbeeld xgboost).

Lineaire regressie

SparkR

# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

sparklyr

# Select features
training_tbl <- mtcars_tbl |>
  select(mpg, hp, wt)

# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
  ml_linear_regression(response = "mpg", features = c("hp", "wt"))

# View model summary
summary(linear_model)

K-means-clustering

SparkR

# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

sparklyr

# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
  select(mpg, hp)

# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
  ft_vector_assembler(
    input_cols = c("mpg", "hp"),
    output_col = "features"
  )

# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
  ml_kmeans(features_col = "features", k = 3)

# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)

Prestaties en optimalisering

Verzamelen

SparkR en sparklyr maken gebruik van collect() om Spark DataFrames naar R-gegevensframes te converteren. Verzamel alleen kleine hoeveelheden gegevens terug naar R-gegevensframes of het Spark-stuurprogramma heeft onvoldoende geheugen.

Om geheugenfouten te voorkomen, beschikt SparkR over ingebouwde optimalisaties in Databricks Runtime waarmee u gegevens kunt verzamelen of door de gebruiker gedefinieerde functies kunt uitvoeren.

Laad het arrow-pakket om optimale prestaties met sparklyr te waarborgen voor het verzamelen van gegevens en het gebruik van UDF's op Databricks Runtime-versies lager dan 14.3 LTS.

library(arrow)

Partitionering in het geheugen

SparkR

# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

sparklyr

# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")

# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)

# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)

# Get number of partitions
sdf_num_partitions(mtcars_tbl)

Cachebeheer

SparkR

# Cache the DataFrame in memory
cache(mtcars_df)

sparklyr

# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")