Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
SparkR a été développé dans le cadre d’Apache Spark, et sa conception est familière aux utilisateurs de Scala et Python, mais potentiellement moins intuitive pour les praticiens R. En outre, SparkR est déconseillé dans Spark 4.0.
En revanche, sparklyr se concentre sur la fourniture d’une expérience plus conviviale pour R. La syntaxe dplyr est exploitée, ce qui est familier pour les utilisateurs de tidyverse avec des modèles tels que select(), filter(), et mutate() pour les opérations DataFrame.
sparklyr est le package R recommandé pour l’utilisation d’Apache Spark. Cette page explique les différences entre SparkR et sparklyr entre les API Spark et fournit des informations sur la migration de code.
Configuration de l’environnement
Installation
Si vous êtes dans l’espace de travail Azure Databricks, aucune installation n’est requise. Charger sparklyr avec library(sparklyr). Pour installer sparklyr localement en dehors d’Azure Databricks, consultez Prise en main.
Connexion à Spark
Connectez-vous à Spark avec sparklyr dans l’espace de travail Databricks ou localement à l’aide de Databricks Connect :
Espace de travail :
library(sparklyr)
sc <- spark_connect(method = "databricks")
Databricks Connect :
sc <- spark_connect(method = "databricks_connect")
Pour plus d’informations et un didacticiel étendu sur Databricks Connect avec sparklyr, consultez Prise en main.
Lecture et écriture de données
sparklyr dispose d’une famille de fonctions spark_read_*() et spark_write_*() pour charger et enregistrer des données, contrairement aux fonctions génériques read.df() et write.df() de SparkR. Il existe également des fonctions uniques pour créer des DataFrames Spark ou des vues temporaires Spark SQL à partir de trames de données R en mémoire.
| Tâche | SparkR | sparklyr |
|---|---|---|
| Copier des données dans Spark | createDataFrame() |
copy_to() |
| Créer une vue temporaire | createOrReplaceTempView() |
Utiliser invoke() directement avec la méthode |
| Écrire des données dans une table | saveAsTable() |
spark_write_table() |
| Écrire des données dans un format spécifié | write.df() |
spark_write_<format>() |
| Lire des données à partir d’une table | tableToDF() |
tbl() ou spark_read_table() |
| Lire des données à partir d’un format spécifié | read.df() |
spark_read_<format>() |
Chargement de données
Pour convertir une trame de données R en dataFrame Spark ou pour créer une vue temporaire à partir d’un DataFrame pour l’appliquer à SQL :
SparkR
mtcars_df <- createDataFrame(mtcars)
sparklyr
mtcars_tbl <- copy_to(
sc,
df = mtcars,
name = "mtcars_tmp",
overwrite = TRUE,
memory = FALSE
)
copy_to() crée une vue temporaire à l’aide du nom spécifié. Vous pouvez utiliser le nom pour référencer des données si vous utilisez SQL directement (par exemple). sdf_sql()
copy_to() Met également en cache les données en définissant le memory paramètre sur TRUE.
Création de vues
Les exemples de code suivants montrent comment les vues temporaires sont créées :
SparkR
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")
sparklyr
spark_dataframe(mtcars_tbl) |>
invoke("createOrReplaceTempView", "mtcars_tmp_view")
Écriture de données
Les exemples de code suivants montrent comment les données sont écrites :
SparkR
# Save a DataFrame to Unity Catalog
saveAsTable(
mtcars_df,
tableName = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save a DataFrame to local filesystem using Delta format
write.df(
mtcars_df,
path = "file:/<path/to/save/delta/mtcars>",
source = "delta",
mode = "overwrite"
)
sparklyr
# Save tbl_spark to Unity Catalog
spark_write_table(
mtcars_tbl,
name = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
mtcars_tbl,
path = "file:/<path/to/save/delta/mtcars>",
mode = "overwrite"
)
# Use DBI
library(DBI)
dbWriteTable(
sc,
value = mtcars_tbl,
name = "<catalog>.<schema>.<table>",
overwrite = TRUE
)
Lecture des données
Les exemples de code suivants montrent comment les données sont lues :
SparkR
# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")
# Load csv file into a DataFrame
read.df(
path = "file:/<path/to/read/csv/data.csv>",
source = "csv",
header = TRUE,
inferSchema = TRUE
)
# Load Delta from local filesystem as a DataFrame
read.df(
path = "file:/<path/to/read/delta/mtcars>",
source = "delta"
)
# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")
sparklyr
# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")
# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))
# Load csv from local filesystem as tbl_spark
spark_read_csv(
sc,
name = "mtcars_csv",
path = "file:/<path/to/csv/mtcars>",
header = TRUE,
infer_schema = TRUE
)
# Load delta from local filesystem as tbl_spark
spark_read_delta(
sc,
name = "mtcars_delta",
path = "file:/tmp/test/sparklyr1"
)
# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")
Traitement des données
Sélectionner et filtrer
SparkR
# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")
# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)
sparklyr
# Select specific columns
mtcars_tbl |>
select(mpg, cyl, hp)
# Filter rows where mpg > 20
mtcars_tbl |>
filter(mpg > 20)
Ajouter des colonnes
SparkR
# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)
sparklyr
# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
mutate(power_to_weight = hp / wt)
Regroupement et agrégation
SparkR
# Calculate average mpg and hp by number of cylinders
mtcars_df |>
groupBy("cyl") |>
summarize(
avg_mpg = avg(mtcars_df$mpg),
avg_hp = avg(mtcars_df$hp)
)
sparklyr
# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
group_by(cyl) |>
summarize(
avg_mpg = mean(mpg),
avg_hp = mean(hp)
)
Joins
Supposons que nous avons un autre jeu de données avec des étiquettes de cylindres que nous voulons joindre à mtcars.
SparkR
# Create another DataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)
# Join mtcars_df with cylinders_df
join(
x = mtcars_df,
y = cylinders_df,
mtcars_df$cyl == cylinders_df$cyl,
joinType = "inner"
)
sparklyr
# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)
# join mtcars_df with cylinders_tbl
mtcars_tbl |>
inner_join(cylinders_tbl, by = join_by(cyl))
Fonctions définies par l’utilisateur (UDF)
Pour créer une fonction personnalisée pour la catégorisation :
# Define the custom function
categorize_hp <- function(df)
df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
df
SparkR
SparkR nécessite de définir explicitement le schéma de sortie avant d’appliquer une fonction :
# Define the schema for the output DataFrame
schema <- structType(
structField("mpg", "double"),
structField("cyl", "double"),
structField("disp", "double"),
structField("hp", "double"),
structField("drat", "double"),
structField("wt", "double"),
structField("qsec", "double"),
structField("vs", "double"),
structField("am", "double"),
structField("gear", "double"),
structField("carb", "double"),
structField("hp_category", "string")
)
# Apply the function across partitions
dapply(
mtcars_df,
func = categorize_hp,
schema = schema
)
# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
mtcars_df,
cols = "hp",
func = categorize_hp,
schema = schema
)
sparklyr
# Load Arrow to avoid cryptic errors
library(arrow)
# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
spark_apply(f = categorize_hp)
# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
spark_apply(
f = summary,
group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
)
spark.lapply() vs spark_apply()
Dans SparkR, spark.lapply() fonctionne sur des listes R plutôt que sur des DataFrames. Il n’existe pas d’équivalent direct dans sparklyr, mais vous pouvez obtenir un comportement similaire avec spark_apply() un DataFrame qui inclut des identificateurs uniques et un regroupement par ces ID. Dans certains cas, les opérations ligne par ligne peuvent également fournir des fonctionnalités comparables. Pour plus d’informations sur spark_apply(), consultez Distribution de calculs R.
SparkR
# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)
# Define a function to apply
square <- function(x)
x * x
# Apply the function over list using Spark
spark.lapply(numbers, square)
sparklyr
# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)
# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)
# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")
# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
df |>
dplyr::rowwise() |>
dplyr::mutate(x = id * 2)
spark_apply(sdf, f = row_func)
Apprentissage automatique
Les exemples SparkR complets et sparklyr pour le Machine Learning se trouvent dans le Guide spark ML et la référence sparklyr.
Note
Si vous n’utilisez pas Spark MLlib, Databricks recommande d’utiliser des UDF pour entraîner avec la bibliothèque de votre choix (par exemple xgboost).
Régression linéaire
SparkR
# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")
# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")
# View model summary
summary(linear_model)
sparklyr
# Select features
training_tbl <- mtcars_tbl |>
select(mpg, hp, wt)
# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
ml_linear_regression(response = "mpg", features = c("hp", "wt"))
# View model summary
summary(linear_model)
Clustering de K-moyennes
SparkR
# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)
# Get cluster predictions
predict(kmeans_model, mtcars_df)
sparklyr
# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
select(mpg, hp)
# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
ft_vector_assembler(
input_cols = c("mpg", "hp"),
output_col = "features"
)
# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
ml_kmeans(features_col = "features", k = 3)
# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)
Niveau de performance et optimisation
En cours de collecte
SparkR et sparklyr utilisent collect() pour convertir des dataFrames Spark en trames de données R. Collectez uniquement de petites quantités de données vers des data frames R, sinon le pilote Spark manquera de mémoire.
Pour éviter les erreurs de mémoire insuffisante, SparkR a des optimisations intégrées dans Databricks Runtime qui aident à collecter des données ou à exécuter des fonctions définies par l’utilisateur.
Pour garantir des performances optimales avec sparklyr pour collecter des données et des UDFs sur les versions Databricks Runtime inférieures à 14.3 LTS, chargez le paquet arrow :
library(arrow)
Partitionnement en mémoire
SparkR
# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)
# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)
# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)
# Get number of partitions
getNumPartitions(mtcars_df)
sparklyr
# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")
# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)
# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)
# Get number of partitions
sdf_num_partitions(mtcars_tbl)
Mise en cache
SparkR
# Cache the DataFrame in memory
cache(mtcars_df)
sparklyr
# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")