spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-23770][R] Exposes repartitionByRange in SparkR
Date Thu, 29 Mar 2018 10:38:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master 641aec68e -> 505480cb5


[SPARK-23770][R] Exposes repartitionByRange in SparkR

## What changes were proposed in this pull request?

This PR proposes to expose `repartitionByRange`.

```R
> df <- createDataFrame(iris)
...
> getNumPartitions(repartitionByRange(df, 3, col = df$Species))
[1] 3
```

## How was this patch tested?

Manually tested and the unit tests were added. The diff with `repartition` can be checked
as below:

```R
> df <- createDataFrame(mtcars)
> take(repartition(df, 10, df$wt), 3)
   mpg cyl  disp  hp drat    wt  qsec vs am gear carb
1 14.3   8 360.0 245 3.21 3.570 15.84  0  0    3    4
2 10.4   8 460.0 215 3.00 5.424 17.82  0  0    3    4
3 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
> take(repartitionByRange(df, 10, df$wt), 3)
   mpg cyl disp hp drat    wt  qsec vs am gear carb
1 30.4   4 75.7 52 4.93 1.615 18.52  1  1    4    2
2 33.9   4 71.1 65 4.22 1.835 19.90  1  1    4    1
3 27.3   4 79.0 66 4.08 1.935 18.90  1  1    4    1
```

Author: hyukjinkwon <gurwls223@apache.org>

Closes #20902 from HyukjinKwon/r-repartitionByRange.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/505480cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/505480cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/505480cb

Branch: refs/heads/master
Commit: 505480cb578af9f23acc77bc82348afc9d8468e8
Parents: 641aec6
Author: hyukjinkwon <gurwls223@apache.org>
Authored: Thu Mar 29 19:38:28 2018 +0900
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Thu Mar 29 19:38:28 2018 +0900

----------------------------------------------------------------------
 R/pkg/NAMESPACE                       |  1 +
 R/pkg/R/DataFrame.R                   | 65 +++++++++++++++++++++++++++++-
 R/pkg/R/generics.R                    |  3 ++
 R/pkg/tests/fulltests/test_sparkSQL.R | 45 +++++++++++++++++++++
 4 files changed, 112 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/505480cb/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index c51eb0f..190c50e 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -151,6 +151,7 @@ exportMethods("arrange",
               "registerTempTable",
               "rename",
               "repartition",
+              "repartitionByRange",
               "rollup",
               "sample",
               "sample_frac",

http://git-wip-us.apache.org/repos/asf/spark/blob/505480cb/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index c485202..a1c9495 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -687,7 +687,7 @@ setMethod("storageLevel",
 #' @rdname coalesce
 #' @name coalesce
 #' @aliases coalesce,SparkDataFrame-method
-#' @seealso \link{repartition}
+#' @seealso \link{repartition}, \link{repartitionByRange}
 #' @examples
 #'\dontrun{
 #' sparkR.session()
@@ -723,7 +723,7 @@ setMethod("coalesce",
 #' @rdname repartition
 #' @name repartition
 #' @aliases repartition,SparkDataFrame-method
-#' @seealso \link{coalesce}
+#' @seealso \link{coalesce}, \link{repartitionByRange}
 #' @examples
 #'\dontrun{
 #' sparkR.session()
@@ -759,6 +759,67 @@ setMethod("repartition",
             dataFrame(sdf)
           })
 
+
+#' Repartition by range
+#'
+#' The following options for repartition by range are possible:
+#' \itemize{
+#'  \item{1.} {Return a new SparkDataFrame range partitioned by
+#'                      the given columns into \code{numPartitions}.}
+#'  \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
+#'                      using \code{spark.sql.shuffle.partitions} as number of partitions.}
+#'}
+#'
+#' @param x a SparkDataFrame.
+#' @param numPartitions the number of partitions to use.
+#' @param col the column by which the range partitioning will be performed.
+#' @param ... additional column(s) to be used in the range partitioning.
+#'
+#' @family SparkDataFrame functions
+#' @rdname repartitionByRange
+#' @name repartitionByRange
+#' @aliases repartitionByRange,SparkDataFrame-method
+#' @seealso \link{repartition}, \link{coalesce}
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' newDF <- repartitionByRange(df, col = df$col1, df$col2)
+#' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
+#'}
+#' @note repartitionByRange since 2.4.0
+setMethod("repartitionByRange",
+          signature(x = "SparkDataFrame"),
+          function(x, numPartitions = NULL, col = NULL, ...) {
+            if (!is.null(numPartitions) && !is.null(col)) {
+              # number of partitions and columns both are specified
+              if (is.numeric(numPartitions) && class(col) == "Column") {
+                cols <- list(col, ...)
+                jcol <- lapply(cols, function(c) { c@jc })
+                sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions),
jcol)
+              } else {
+                stop(paste("numPartitions and col must be numeric and Column; however, got",
+                           class(numPartitions), "and", class(col)))
+              }
+            } else if (!is.null(col))  {
+              # only columns are specified
+              if (class(col) == "Column") {
+                cols <- list(col, ...)
+                jcol <- lapply(cols, function(c) { c@jc })
+                sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
+              } else {
+                stop(paste("col must be Column; however, got", class(col)))
+              }
+            } else if (!is.null(numPartitions)) {
+              # only numPartitions is specified
+              stop("At least one partition-by column must be specified.")
+            } else {
+              stop("Please, specify a column(s) or the number of partitions with a column(s)")
+            }
+            dataFrame(sdf)
+          })
+
 #' toJSON
 #'
 #' Converts a SparkDataFrame into a SparkDataFrame of JSON string.

http://git-wip-us.apache.org/repos/asf/spark/blob/505480cb/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 6fba4b6..974beff 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -531,6 +531,9 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
 #' @rdname repartition
 setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
 
+#' @rdname repartitionByRange
+setGeneric("repartitionByRange", function(x, ...) { standardGeneric("repartitionByRange")
})
+
 #' @rdname sample
 setGeneric("sample",
            function(x, withReplacement = FALSE, fraction, seed) {

http://git-wip-us.apache.org/repos/asf/spark/blob/505480cb/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 439191a..7105469 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3104,6 +3104,51 @@ test_that("repartition by columns on DataFrame", {
   })
 })
 
+test_that("repartitionByRange on a DataFrame", {
+  # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
+  # partitions to reduce the number of the tasks to speed up the test. This is particularly
+  # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
+  conf <- callJMethod(sparkSession, "conf")
+  shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
+  callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
+  tryCatch({
+    df <- createDataFrame(mtcars)
+    expect_error(repartitionByRange(df, "haha", df$mpg),
+                 "numPartitions and col must be numeric and Column.*")
+    expect_error(repartitionByRange(df),
+                 ".*specify a column.*or the number of partitions with a column.*")
+    expect_error(repartitionByRange(df, col = "haha"),
+                 "col must be Column; however, got.*")
+    expect_error(repartitionByRange(df, 3),
+                 "At least one partition-by column must be specified.")
+
+    # The order of rows should be different with a normal repartition.
+    actual <- repartitionByRange(df, 3, df$mpg)
+    expect_equal(getNumPartitions(actual), 3)
+    expect_false(identical(collect(actual), collect(repartition(df, 3, df$mpg))))
+
+    actual <- repartitionByRange(df, col = df$mpg)
+    expect_false(identical(collect(actual), collect(repartition(df, col = df$mpg))))
+
+    # They should have same data.
+    actual <- collect(repartitionByRange(df, 3, df$mpg))
+    actual <- actual[order(actual$mpg), ]
+    expected <- collect(repartition(df, 3, df$mpg))
+    expected <- expected[order(expected$mpg), ]
+    expect_true(all(actual == expected))
+
+    actual <- collect(repartitionByRange(df, col = df$mpg))
+    actual <- actual[order(actual$mpg), ]
+    expected <- collect(repartition(df, col = df$mpg))
+    expected <- expected[order(expected$mpg), ]
+    expect_true(all(actual == expected))
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
+  })
+})
+
 test_that("coalesce, repartition, numPartitions", {
   df <- as.DataFrame(cars, numPartitions = 5)
   expect_equal(getNumPartitions(df), 5)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message