Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
SparkR utvecklades som en del av Apache Spark och dess design är bekant för användare av Scala och Python, men potentiellt mindre intuitivt för R-utövare. Dessutom är SparkR inaktuellt i Spark 4.0.
Sparklyr fokuserar däremot på att ge en mer R-vänlig upplevelse. Den använder dplyr syntax, som är bekant för användare av tidyverse med mönster som select(), filter()och mutate() för DataFrame-åtgärder.
sparklyr är det rekommenderade R-paketet för att arbeta med Apache Spark. Den här sidan förklarar skillnaderna mellan SparkR och sparklyr mellan Spark-API:er och innehåller information om kodmigrering.
Miljöinställningar
Installation
Om du befinner dig på Azure Databricks-arbetsytan krävs ingen installation. Läs in sparklyr med library(sparklyr). Information om hur du installerar sparklyr lokalt utanför Azure Databricks finns i Komma igång.
Ansluta till Spark
Anslut till Spark med sparklyr på Databricks-arbetsytan eller lokalt med Databricks Connect:
Arbetsyta:
library(sparklyr)
sc <- spark_connect(method = "databricks")
Databricks Connect:
sc <- spark_connect(method = "databricks_connect")
Mer information och en utökad självstudie om Databricks Connect med sparklyr finns i Komma igång.
Läsa och skriva data
sparklyr har en serie av spark_read_*() och spark_write_*() funktioner för att läsa in och spara data, till skillnad från SparkR:s generiska read.df() och write.df() funktioner. Det finns också unika funktioner för att skapa tillfälliga Spark DataFrames- eller Spark SQL-vyer från R-dataramar i minnet.
| Uppgift | SparkR | sparklyr |
|---|---|---|
| Kopiera data till Spark | createDataFrame() |
copy_to() |
| Skapa tillfällig vy | createOrReplaceTempView() |
Använd invoke() direkt med metoden |
| Skriva data till tabell | saveAsTable() |
spark_write_table() |
| Skriva data till ett angivet format | write.df() |
spark_write_<format>() |
| Läs data från tabellen | tableToDF() |
tbl() eller spark_read_table() |
| Läsa data från ett angivet format | read.df() |
spark_read_<format>() |
Läser in data
Om du vill konvertera en R-dataram till en Spark DataFrame eller skapa en tillfällig vy från en DataFrame för att tillämpa SQL på den:
SparkR
mtcars_df <- createDataFrame(mtcars)
sparklyr
mtcars_tbl <- copy_to(
sc,
df = mtcars,
name = "mtcars_tmp",
overwrite = TRUE,
memory = FALSE
)
copy_to() skapar en tillfällig vy med det angivna namnet. Du kan använda namn för att referera till data om du använder SQL direkt (till exempel sdf_sql()).
copy_to() Cachelagrar också data genom att ange parametern memory till TRUE.
Skapa vyer
Följande kodexempel visar hur tillfälliga vyer skapas:
SparkR
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")
sparklyr
spark_dataframe(mtcars_tbl) |>
invoke("createOrReplaceTempView", "mtcars_tmp_view")
Att skriva in data
Följande kodexempel visar hur data skrivs:
SparkR
# Save a DataFrame to Unity Catalog
saveAsTable(
mtcars_df,
tableName = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save a DataFrame to local filesystem using Delta format
write.df(
mtcars_df,
path = "file:/<path/to/save/delta/mtcars>",
source = "delta",
mode = "overwrite"
)
sparklyr
# Save tbl_spark to Unity Catalog
spark_write_table(
mtcars_tbl,
name = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
mtcars_tbl,
path = "file:/<path/to/save/delta/mtcars>",
mode = "overwrite"
)
# Use DBI
library(DBI)
dbWriteTable(
sc,
value = mtcars_tbl,
name = "<catalog>.<schema>.<table>",
overwrite = TRUE
)
Läsa data
Följande kodexempel visar hur data läse:
SparkR
# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")
# Load csv file into a DataFrame
read.df(
path = "file:/<path/to/read/csv/data.csv>",
source = "csv",
header = TRUE,
inferSchema = TRUE
)
# Load Delta from local filesystem as a DataFrame
read.df(
path = "file:/<path/to/read/delta/mtcars>",
source = "delta"
)
# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")
sparklyr
# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")
# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))
# Load csv from local filesystem as tbl_spark
spark_read_csv(
sc,
name = "mtcars_csv",
path = "file:/<path/to/csv/mtcars>",
header = TRUE,
infer_schema = TRUE
)
# Load delta from local filesystem as tbl_spark
spark_read_delta(
sc,
name = "mtcars_delta",
path = "file:/tmp/test/sparklyr1"
)
# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")
Bearbeta data
Välj och filtrera
SparkR
# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")
# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)
sparklyr
# Select specific columns
mtcars_tbl |>
select(mpg, cyl, hp)
# Filter rows where mpg > 20
mtcars_tbl |>
filter(mpg > 20)
Lägg till kolumner
SparkR
# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)
sparklyr
# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
mutate(power_to_weight = hp / wt)
Gruppering och sammansättning
SparkR
# Calculate average mpg and hp by number of cylinders
mtcars_df |>
groupBy("cyl") |>
summarize(
avg_mpg = avg(mtcars_df$mpg),
avg_hp = avg(mtcars_df$hp)
)
sparklyr
# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
group_by(cyl) |>
summarize(
avg_mpg = mean(mpg),
avg_hp = mean(hp)
)
Ansluter sig
Anta att vi har en annan datauppsättning med cylinderetiketter som vi vill koppla till mtcars.
SparkR
# Create another DataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)
# Join mtcars_df with cylinders_df
join(
x = mtcars_df,
y = cylinders_df,
mtcars_df$cyl == cylinders_df$cyl,
joinType = "inner"
)
sparklyr
# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)
# join mtcars_df with cylinders_tbl
mtcars_tbl |>
inner_join(cylinders_tbl, by = join_by(cyl))
Användardefinierade funktioner (UDF:er)
Så här skapar du en anpassad funktion för kategorisering:
# Define the custom function
categorize_hp <- function(df)
df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
df
SparkR
SparkR kräver att utdataschemat definieras explicit innan en funktion tillämpas:
# Define the schema for the output DataFrame
schema <- structType(
structField("mpg", "double"),
structField("cyl", "double"),
structField("disp", "double"),
structField("hp", "double"),
structField("drat", "double"),
structField("wt", "double"),
structField("qsec", "double"),
structField("vs", "double"),
structField("am", "double"),
structField("gear", "double"),
structField("carb", "double"),
structField("hp_category", "string")
)
# Apply the function across partitions
dapply(
mtcars_df,
func = categorize_hp,
schema = schema
)
# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
mtcars_df,
cols = "hp",
func = categorize_hp,
schema = schema
)
sparklyr
# Load Arrow to avoid cryptic errors
library(arrow)
# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
spark_apply(f = categorize_hp)
# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
spark_apply(
f = summary,
group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
)
spark.lapply() vs spark_apply()
I SparkR spark.lapply() körs på R-listor i stället för DataFrames. Det finns ingen direkt motsvarighet i sparklyr, men du kan uppnå liknande beteende med spark_apply() genom att arbeta med en DataFrame som innehåller unika identifierare och gruppering av dessa ID:n. I vissa fall kan radvisa åtgärder också ge jämförbara funktioner. Mer information om spark_apply()finns i Distribuera R-beräkningar.
SparkR
# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)
# Define a function to apply
square <- function(x)
x * x
# Apply the function over list using Spark
spark.lapply(numbers, square)
sparklyr
# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)
# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)
# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")
# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
df |>
dplyr::rowwise() |>
dplyr::mutate(x = id * 2)
spark_apply(sdf, f = row_func)
Maskininlärning
Fullständiga SparkR- och sparklyr-exempel för maskininlärning finns i Spark ML-guiden och sparklyr-referensen.
Anmärkning
Om du inte använder Spark MLlib rekommenderar Databricks att du använder UDF:er för att träna med det bibliotek du väljer (till exempel xgboost).
Linjär regression
SparkR
# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")
# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")
# View model summary
summary(linear_model)
sparklyr
# Select features
training_tbl <- mtcars_tbl |>
select(mpg, hp, wt)
# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
ml_linear_regression(response = "mpg", features = c("hp", "wt"))
# View model summary
summary(linear_model)
K-means-klusteranalys
SparkR
# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)
# Get cluster predictions
predict(kmeans_model, mtcars_df)
sparklyr
# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
select(mpg, hp)
# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
ft_vector_assembler(
input_cols = c("mpg", "hp"),
output_col = "features"
)
# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
ml_kmeans(features_col = "features", k = 3)
# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)
Prestanda och optimering
Samla
Både SparkR och sparklyr använder collect() för att konvertera Spark DataFrames till R-dataramar. Samla bara in små mängder data tillbaka till R-dataramar, annars får Spark-drivrutinen slut på minne.
För att förhindra minnesfel har SparkR inbyggda optimeringar i Databricks Runtime som hjälper till att samla in data eller köra användardefinierade funktioner.
För att säkerställa optimala prestanda med sparklyr för insamling av data och UDF:er på Databricks Runtime-versioner under 14.3 LTS läser du in arrow paketet:
library(arrow)
Partitionering i minnet
SparkR
# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)
# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)
# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)
# Get number of partitions
getNumPartitions(mtcars_df)
sparklyr
# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")
# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)
# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)
# Get number of partitions
sdf_num_partitions(mtcars_tbl)
Cache
SparkR
# Cache the DataFrame in memory
cache(mtcars_df)
sparklyr
# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")