Introduction
In this note we will show how to speed up work in R
by partitioning data and process-level parallelization. We will show the technique with three different R
packages: rqdatatable
, data.table
, and dplyr
. The methods shown will also work with base-R
and other packages.
For each of the above packages we speed up work by using wrapr::execute_parallel
which in turn uses wrapr::partition_tables
to partition un-related data.frame
rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel
conveniently bundles all of these steps together when working with rquery
pipelines.
The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.
Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table
itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.
Our example
First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).
library("rqdatatable")
## Loading required package: rquery
library("microbenchmark")
library("ggplot2")
library("WVPlots")
suppressPackageStartupMessages(library("dplyr"))
## Warning: package 'dplyr' was built under R version 3.5.1
base::date()
## [1] "Sun Jul 8 09:05:25 2018"
R.version.string
## [1] "R version 3.5.0 (2018-04-23)"
parallel::detectCores()
## [1] 4
packageVersion("parallel")
## [1] '3.5.0'
packageVersion("rqdatatable")
## [1] '0.1.2'
packageVersion("rquery")
## [1] '0.5.1'
packageVersion("dplyr")
## [1] '0.7.6'
ncore <- parallel::detectCores()
print(ncore)
## [1] 4
cl <- parallel::makeCluster(ncore)
print(cl)
## socket cluster with 4 nodes on host 'localhost'
set.seed(2362)
mk_example <- function(nkey, nrep, ngroup = 20) {
keys <- paste0("key_", seq_len(nkey))
key_group <- sample(as.character(seq_len(ngroup)),
length(keys), replace = TRUE)
names(key_group) <- keys
key_table <- data.frame(
key = rep(keys, nrep),
stringsAsFactors = FALSE)
key_table$data <- runif(nrow(key_table))
instance_table <- data.frame(
key = rep(keys, nrep),
stringsAsFactors = FALSE)
instance_table$id <- seq_len(nrow(instance_table))
instance_table$info <- runif(nrow(instance_table))
# groups should be no finer than keys
key_table$key_group <- key_group[key_table$key]
instance_table$key_group <- key_group[instance_table$key]
list(key_table = key_table,
instance_table = instance_table)
}
dlist <- mk_example(10, 10)
data <- dlist$instance_table
annotation <- dlist$key_table
rquery / rqdatatable
rquery
and rqdatatable
can implement a non-trivial calculation as follows.
# possible data lookup: find rows that
# have lookup data <= info
optree <- local_td(data) %.>%
natural_join(.,
local_td(annotation),
jointype = "INNER",
by = "key") %.>%
select_rows_nse(., data <= info) %.>%
pick_top_k(.,
k = 1,
partitionby = "id",
orderby = "data",
reverse = "data",
keep_order_column = FALSE) %.>%
orderby(., "id")
cat(format(optree))
## table('data';
## key,
## id,
## info,
## key_group) %.>%
## natural_join(.,
## table('annotation';
## key,
## data,
## key_group),
## j= INNER, by= key) %.>%
## select_rows(.,
## data <= info) %.>%
## extend(.,
## row_number := row_number(),
## p= id,
## o= "data" DESC) %.>%
## select_rows(.,
## row_number <= 1) %.>%
## drop_columns(.,
## row_number) %.>%
## orderby(., id)
res1 <- ex_data_table(optree)
head(res1)
## data id info key key_group
## 1: 0.9152014 1 0.9860654 key_1 20
## 2: 0.5599810 2 0.5857570 key_2 8
## 3: 0.3011882 3 0.3334490 key_3 10
## 4: 0.3650987 4 0.3960980 key_4 5
## 5: 0.1469254 5 0.1753649 key_5 14
## 6: 0.2567631 6 0.3510280 key_6 7
nrow(res1)
## [1] 94
And we can execute the operations in parallel.
parallel::clusterEvalQ(cl,
library("rqdatatable"))
## [[1]]
## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices"
## [6] "utils" "datasets" "methods" "base"
##
## [[2]]
## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices"
## [6] "utils" "datasets" "methods" "base"
##
## [[3]]
## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices"
## [6] "utils" "datasets" "methods" "base"
##
## [[4]]
## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices"
## [6] "utils" "datasets" "methods" "base"
res2 <- ex_data_table_parallel(optree,
"key_group",
cl)
head(res2)
## data id info key key_group
## 1: 0.9152014 1 0.9860654 key_1 20
## 2: 0.5599810 2 0.5857570 key_2 8
## 3: 0.3011882 3 0.3334490 key_3 10
## 4: 0.3650987 4 0.3960980 key_4 5
## 5: 0.1469254 5 0.1753649 key_5 14
## 6: 0.2567631 6 0.3510280 key_6 7
nrow(res2)
## [1] 94
data.table
data.table
can implement the same function.
library("data.table")
##
## Attaching package: 'data.table'
## The following objects are masked from 'package:dplyr':
##
## between, first, last
packageVersion("data.table")
## [1] '1.11.4'
data_table_f <- function(data, annotation) {
data <- data.table::as.data.table(data)
annotation <- data.table::as.data.table(annotation)
joined <- merge(data, annotation,
by = "key",
all=FALSE,
allow.cartesian=TRUE)
joined <- joined[joined$data <= joined$info, ]
data.table::setorderv(joined, cols = "data")
joined <- joined[, .SD[.N], id]
data.table::setorderv(joined, cols = "id")
}
resdt <- data_table_f(data, annotation)
head(resdt)
## id key info key_group.x data key_group.y
## 1: 1 key_1 0.9860654 20 0.9152014 20
## 2: 2 key_2 0.5857570 8 0.5599810 8
## 3: 3 key_3 0.3334490 10 0.3011882 10
## 4: 4 key_4 0.3960980 5 0.3650987 5
## 5: 5 key_5 0.1753649 14 0.1469254 14
## 6: 6 key_6 0.3510280 7 0.2567631 7
nrow(resdt)
## [1] 94
We can also run data.table
in parallel using wrapr::execute_parallel
.
parallel::clusterEvalQ(cl, library("data.table"))
## [[1]]
## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics"
## [6] "grDevices" "utils" "datasets" "methods" "base"
##
## [[2]]
## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics"
## [6] "grDevices" "utils" "datasets" "methods" "base"
##
## [[3]]
## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics"
## [6] "grDevices" "utils" "datasets" "methods" "base"
##
## [[4]]
## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics"
## [6] "grDevices" "utils" "datasets" "methods" "base"
parallel::clusterExport(cl, "data_table_f")
dt_f <- function(tables_list) {
data <- tables_list$data
annotation <- tables_list$annotation
data_table_f(data, annotation)
}
data_table_parallel_f <- function(data, annotation) {
respdt <- wrapr::execute_parallel(
tables = list(data = data,
annotation = annotation),
f = dt_f,
partition_column = "key_group",
cl = cl) %.>%
data.table::rbindlist(.)
data.table::setorderv(respdt, cols = "id")
respdt
}
respdt <- data_table_parallel_f(data, annotation)
head(respdt)
## id key info key_group.x data key_group.y
## 1: 1 key_1 0.9860654 20 0.9152014 20
## 2: 2 key_2 0.5857570 8 0.5599810 8
## 3: 3 key_3 0.3334490 10 0.3011882 10
## 4: 4 key_4 0.3960980 5 0.3650987 5
## 5: 5 key_5 0.1753649 14 0.1469254 14
## 6: 6 key_6 0.3510280 7 0.2567631 7
nrow(respdt)
## [1] 94
dplyr
dplyr
can also implement the example.
dplyr_pipeline <- function(data, annotation) {
res <- data %>%
inner_join(annotation, by = "key") %>%
filter(data <= info) %>%
group_by(id) %>%
arrange(-data) %>%
mutate(rownum = row_number()) %>%
ungroup() %>%
filter(rownum == 1) %>%
arrange(id)
res
}
resd <- dplyr_pipeline(data, annotation)
head(resd)
## # A tibble: 6 x 7
## key id info key_group.x data key_group.y rownum
## <chr> <int> <dbl> <chr> <dbl> <chr> <int>
## 1 key_1 1 0.986 20 0.915 20 1
## 2 key_2 2 0.586 8 0.560 8 1
## 3 key_3 3 0.333 10 0.301 10 1
## 4 key_4 4 0.396 5 0.365 5 1
## 5 key_5 5 0.175 14 0.147 14 1
## 6 key_6 6 0.351 7 0.257 7 1
nrow(resd)
## [1] 94
And we can use wrapr::execute_parallel
to parallelize the dplyr
solution.
parallel::clusterEvalQ(cl, library("dplyr"))
## [[1]]
## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats"
## [6] "graphics" "grDevices" "utils" "datasets" "methods"
## [11] "base"
##
## [[2]]
## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats"
## [6] "graphics" "grDevices" "utils" "datasets" "methods"
## [11] "base"
##
## [[3]]
## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats"
## [6] "graphics" "grDevices" "utils" "datasets" "methods"
## [11] "base"
##
## [[4]]
## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats"
## [6] "graphics" "grDevices" "utils" "datasets" "methods"
## [11] "base"
parallel::clusterExport(cl, "dplyr_pipeline")
dplyr_f <- function(tables_list) {
data <- tables_list$data
annotation <- tables_list$annotation
dplyr_pipeline(data, annotation)
}
dplyr_parallel_f <- function(data, annotation) {
respdt <- wrapr::execute_parallel(
tables = list(data = data,
annotation = annotation),
f = dplyr_f,
partition_column = "key_group",
cl = cl) %>%
dplyr::bind_rows() %>%
arrange(id)
}
respdplyr <- dplyr_parallel_f(data, annotation)
head(respdplyr)
## # A tibble: 6 x 7
## key id info key_group.x data key_group.y rownum
## <chr> <int> <dbl> <chr> <dbl> <chr> <int>
## 1 key_1 1 0.986 20 0.915 20 1
## 2 key_2 2 0.586 8 0.560 8 1
## 3 key_3 3 0.333 10 0.301 10 1
## 4 key_4 4 0.396 5 0.365 5 1
## 5 key_5 5 0.175 14 0.147 14 1
## 6 key_6 6 0.351 7 0.257 7 1
nrow(respdplyr)
## [1] 94
Benchmark
We can benchmark the various realizations.
dlist <- mk_example(300, 300)
data <- dlist$instance_table
annotation <- dlist$key_table
timings <- microbenchmark(
data_table_parallel =
nrow(data_table_parallel_f(data, annotation)),
data_table = nrow(data_table_f(data, annotation)),
rqdatatable_parallel =
nrow(ex_data_table_parallel(optree, "key_group", cl)),
rqdatatable = nrow(ex_data_table(optree)),
dplyr_parallel =
nrow(dplyr_parallel_f(data, annotation)),
dplyr = nrow(dplyr_pipeline(data, annotation)),
times = 10L)
saveRDS(timings, "Parallel_rqdatatable_timings.RDS")
Conclusion
print(timings)
## Unit: seconds
## expr min lq mean median uq
## data_table_parallel 5.274560 5.457105 5.609827 5.546554 5.686829
## data_table 9.401677 9.496280 9.701807 9.541218 9.748159
## rqdatatable_parallel 7.165216 7.497561 7.587663 7.563883 7.761987
## rqdatatable 12.490469 12.700474 13.320480 12.898154 14.229233
## dplyr_parallel 6.492262 6.572062 6.784865 6.787277 6.875076
## dplyr 20.056555 20.450064 20.647073 20.564529 20.800350
## max neval
## 6.265888 10
## 10.419316 10
## 7.949404 10
## 14.282269 10
## 7.328223 10
## 21.332103 10
# autoplot(timings)
timings <- as.data.frame(timings)
timings$seconds <- timings$time/1e+9
ScatterBoxPlotH(timings,
xvar = "seconds", yvar = "expr",
title="task duration by method")

The benchmark timings show parallelized data.table
is the fastest, followed by parallelized dplyr
, and parallelized rqdatatable
. In the non-paraellized case data.table
is the fastest, followed by rqdatatable
, and then dplyr
.
A reason dplyr
sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table
) is that data.table
starts already multi-threaded, so data.table
is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).
rquery
pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable
supplies a very good in-memory implementation of the rquery
system based on data.table
. rquery
also speeds up solution development by supplying higher order operators and early debugging features.
In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable
, data.table
, or dplyr
.
Note: we did not include alternatives such as multidplyr
or dtplyr
in the timings, as they did not appear to work on this example.
Note: Matt Summersgill pointed out for this sort of problem data.table
is far more effective than I have originally shown. rqdatatable
can not take advantage of this technique, because I have not yet truly implemented theta-joins in the rqdatatable
dialect of rquery
(it is on my TODO-list).
Materials
A re-rendering of this article can be found here, source code here, and raw timings here.
Categories: Uncategorized
jmount
Data Scientist and trainer at Win Vector LLC. One of the authors of Practical Data Science with R.
Thank you for the great article. It is heart-warming to see that the historic arch-enemy of potential R converts (speed) is now facing formidable challenges in the form of packages.
Totally unrelated to the main purpose of the article but why did the dplyr package still produce a message despite the use of suppressPackageStartupMessages()?
Guessing the message in question is a message, but not considered a “startup message.”
Nice examples, thanks for sharing! A couple thoughts:
Users can avoid making a copy and reduce memory usage by using
setDT()
instead ofas.data.table()
within functions, with the caveat that the tables in the global environment will be modified as a side effect of the function. Depending on your use case, these side effects may or may not be acceptable, but the speed-up and reduced memory usage may be well worth it. You can also set keys insetDT()
, and sorting ahead of time can provide a little performance boost on the merge.Non-equi joins (see the “on” argument documentation in
?data.table
) can accomplish the same result here with faster execution and reduced memory usage. This is one of the lesser known features of[.data.table
, but in cases where performance is critical or you’re bumping up against memory limitations, these can make a non-trivial difference.Subsets on
.SD
are not optimized in thedata.table
internals due to the need for flexibility, so if you use grouped indices (.I
) to subset the main table you can also get a performance improvement.Putting those things together, the function below uses less than half the memory and runs more than twice as fast as the original
data_table_f()
example with identical results (only difference being column order).data_table_f2 <- function(data, annotation) {
setDT(data, key = c("key","info"))
setDT(annotation, key = c("key","data"))
joined2 <- data[annotation,
on=.(key, info >= data),
.(id,
key,
info = x.info,
key_group.x = x.key_group,
data = i.data,
key_group.y = i.key_group),
allow.cartesian=TRUE,
nomatch = 0]
setorder(joined2,data)
joined2[joined2[,.I[.N], keyby = .(id)]$V1]
}
resdt2 <- data_table_f2(data, annotation)
identical(resdt[,.(id,key,info,key_group.x,data,key_group.y)],
resdt2[,.(id,key,info,key_group.x,data,key_group.y)])
# TRUE
Wow, thanks for the note!
I hadn’t tried a non-equi or theta join yet (it is not yet implemented in
rqdatatable
). But you are right, it really performs on this example. I have re-run with your function instead of mine and the results are great: non-paralleldata.table
is faster than other methods and speeds up still more with the process level parallelism.data.table
really is the gift that keeps on giving when it comes to performance, it’s amazing how much power is packed between those brackets!I am going to start looking for opportunities to try out
wrapr::execute_parallel()
, the partitioning syntax seems pretty painless and the overhead looks pretty low, exciting stuff!P.S. — For future reference, the data.table contributors use the twitter hashtag #rdatatable, I know they appreciate the support!
Thanks!
Now that I think about it the partitioning is essentially a by-hand
base::split()
. Is there adata.table
enhanced version of that? If so I could switch over in some update.There is a
split
S3 method for data.table objects. From the documentation: “Split method for data.table. Faster and more flexible. Be aware that processing list ofdata.tables will be generally much slower than manipulation in single data.table by group using by argument, read more on data.table.”
I’m a little hazy on how the dispatch would be handled within a packaged function (specifically, not sure if you would need to explicitly use
data.table::split()
to take advantage?)in my device, parallel times were always higher. However, this package worked as expected. http://www.business-science.io/code-tools/2016/12/18/multidplyr.html. Any ideas?
Thanks for the note.
I took care that
R
and all packages were up to date (latest CRAN if on CRAN, else latest from GitHub; with the excpeption ofrquery
, which we are using a development version of) for these examples. I took care to note down most package versions.multidplyr
appears to work unless you try a join. I have the example in the linked materials. So I am assuming you ran somemultidplyr
example other than the one in my linked materials. If not, I’d be happy to take a correction and includemultidplyr
in my timings.As far as parallel being slower for you. That would be a symptom of the problem partition and transport cost being higher than the task cost. So it may mean that the problem is not at a scale considered difficult for your device.
problem was I was using a smaller number of tables. it works
Ah, good news. Thanks!