spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiva...@apache.org
Subject spark git commit: [SPARK-11395][SPARKR] Support over and window specification in SparkR.
Date Fri, 06 May 2016 01:49:47 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7f5922aa4 -> 157a49aa4


[SPARK-11395][SPARKR] Support over and window specification in SparkR.

This PR:
1. Implement WindowSpec S4 class.
2. Implement Window.partitionBy() and Window.orderBy() as utility functions to create WindowSpec
objects.
3. Implement over() of Column class.

Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>

Closes #10094 from sun-rui/SPARK-11395.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/157a49aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/157a49aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/157a49aa

Branch: refs/heads/master
Commit: 157a49aa410dc1870cd171148d317084c5a90d23
Parents: 7f5922a
Author: Sun Rui <rui.sun@intel.com>
Authored: Thu May 5 18:49:43 2016 -0700
Committer: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Committed: Thu May 5 18:49:43 2016 -0700

----------------------------------------------------------------------
 R/pkg/DESCRIPTION                         |   2 +
 R/pkg/NAMESPACE                           |  10 ++
 R/pkg/R/DataFrame.R                       |   4 +-
 R/pkg/R/WindowSpec.R                      | 188 +++++++++++++++++++++++++
 R/pkg/R/generics.R                        |  29 +++-
 R/pkg/R/pairRDD.R                         |   4 +-
 R/pkg/R/window.R                          |  98 +++++++++++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  36 +++++
 8 files changed, 364 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/DESCRIPTION
----------------------------------------------------------------------
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 7179438..963a1bb 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -26,6 +26,7 @@ Collate:
     'pairRDD.R'
     'DataFrame.R'
     'SQLContext.R'
+    'WindowSpec.R'
     'backend.R'
     'broadcast.R'
     'client.R'
@@ -38,4 +39,5 @@ Collate:
     'stats.R'
     'types.R'
     'utils.R'
+    'window.R'
 RoxygenNote: 5.0.1

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 73f7c59..1432ab8 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -216,6 +216,7 @@ exportMethods("%in%",
               "next_day",
               "ntile",
               "otherwise",
+              "over",
               "percent_rank",
               "pmod",
               "quarter",
@@ -315,3 +316,12 @@ export("structField",
        "structType.jobj",
        "structType.structField",
        "print.structType")
+
+exportClasses("WindowSpec")
+
+export("partitionBy",
+       "rowsBetween",
+       "rangeBetween")
+
+export("window.partitionBy",
+       "window.orderBy")

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index fcf473a..43c46b8 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1749,8 +1749,8 @@ setMethod("arrange",
 #' @export
 setMethod("orderBy",
           signature(x = "SparkDataFrame", col = "characterOrColumn"),
-          function(x, col) {
-            arrange(x, col)
+          function(x, col, ...) {
+            arrange(x, col, ...)
           })
 
 #' Filter

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/R/WindowSpec.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/WindowSpec.R b/R/pkg/R/WindowSpec.R
new file mode 100644
index 0000000..581176a
--- /dev/null
+++ b/R/pkg/R/WindowSpec.R
@@ -0,0 +1,188 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes
+
+#' @include generics.R jobj.R column.R
+NULL
+
+#' @title S4 class that represents a WindowSpec
+#' @description WindowSpec can be created by using window.partitionBy()
+#'              or window.orderBy()
+#' @rdname WindowSpec
+#' @seealso \link{window.partitionBy}, \link{window.orderBy}
+#'
+#' @param sws A Java object reference to the backing Scala WindowSpec
+#' @export
+setClass("WindowSpec",
+         slots = list(sws = "jobj"))
+
+setMethod("initialize", "WindowSpec", function(.Object, sws) {
+  .Object@sws <- sws
+  .Object
+})
+
+windowSpec <- function(sws) {
+  stopifnot(class(sws) == "jobj")
+  new("WindowSpec", sws)
+}
+
+#' @rdname show
+setMethod("show", "WindowSpec",
+          function(object) {
+            cat("WindowSpec", callJMethod(object@sws, "toString"), "\n")
+          })
+
+#' partitionBy
+#'
+#' Defines the partitioning columns in a WindowSpec.
+#'
+#' @param x a WindowSpec
+#' @return a WindowSpec
+#' @rdname partitionBy
+#' @name partitionBy
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#'   partitionBy(ws, "col1", "col2")
+#'   partitionBy(ws, df$col1, df$col2)
+#' }
+setMethod("partitionBy",
+          signature(x = "WindowSpec"),
+          function(x, col, ...) {
+            stopifnot (class(col) %in% c("character", "Column"))
+
+            if (class(col) == "character") {
+              windowSpec(callJMethod(x@sws, "partitionBy", col, list(...)))
+            } else {
+              jcols <- lapply(list(col, ...), function(c) {
+                c@jc
+              })
+              windowSpec(callJMethod(x@sws, "partitionBy", jcols))
+            }
+          })
+
+#' orderBy
+#'
+#' Defines the ordering columns in a WindowSpec.
+#'
+#' @param x a WindowSpec
+#' @return a WindowSpec
+#' @rdname arrange
+#' @name orderBy
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#'   orderBy(ws, "col1", "col2")
+#'   orderBy(ws, df$col1, df$col2)
+#' }
+setMethod("orderBy",
+          signature(x = "WindowSpec", col = "character"),
+          function(x, col, ...) {
+            windowSpec(callJMethod(x@sws, "orderBy", col, list(...)))
+          })
+
+#' @rdname arrange
+#' @name orderBy
+#' @export
+setMethod("orderBy",
+          signature(x = "WindowSpec", col = "Column"),
+          function(x, col, ...) {
+            jcols <- lapply(list(col, ...), function(c) {
+              c@jc
+            })
+            windowSpec(callJMethod(x@sws, "orderBy", jcols))
+          })
+
+#' rowsBetween
+#'
+#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+#' 
+#' Both `start` and `end` are relative positions from the current row. For example, "0" means
+#' "current row", while "-1" means the row before the current row, and "5" means the fifth
row
+#' after the current row.
+#'
+#' @param x a WindowSpec
+#' @param start boundary start, inclusive.
+#'              The frame is unbounded if this is the minimum long value.
+#' @param end boundary end, inclusive.
+#'            The frame is unbounded if this is the maximum long value.
+#' @return a WindowSpec
+#' @rdname rowsBetween
+#' @name rowsBetween
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#'   rowsBetween(ws, 0, 3)
+#' }
+setMethod("rowsBetween",
+          signature(x = "WindowSpec", start = "numeric", end = "numeric"),
+          function(x, start, end) {
+            # "start" and "end" should be long, due to serde limitation,
+            # limit "start" and "end" as integer now
+            windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end)))
+          })
+
+#' rangeBetween
+#'
+#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+#' 
+#' Both `start` and `end` are relative from the current row. For example, "0" means "current
row",
+#' while "-1" means one off before the current row, and "5" means the five off after the
+#' current row.
+
+#' @param x a WindowSpec
+#' @param start boundary start, inclusive.
+#'              The frame is unbounded if this is the minimum long value.
+#' @param end boundary end, inclusive.
+#'            The frame is unbounded if this is the maximum long value.
+#' @return a WindowSpec
+#' @rdname rangeBetween
+#' @name rangeBetween
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#'   rangeBetween(ws, 0, 3)
+#' }
+setMethod("rangeBetween",
+          signature(x = "WindowSpec", start = "numeric", end = "numeric"),
+          function(x, start, end) {
+            # "start" and "end" should be long, due to serde limitation,
+            # limit "start" and "end" as integer now
+            windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end)))
+          })
+
+# Note that over is a method of Column class, but it is placed here to
+# avoid Roxygen circular-dependency between class Column and WindowSpec.
+
+#' over
+#'
+#' Define a windowing column. 
+#'
+#' @rdname over
+#' @name over
+#' @family colum_func
+#' @export
+setMethod("over",
+          signature(x = "Column", window = "WindowSpec"),
+          function(x, window) {
+            column(callJMethod(x@jc, "over", window@sws))
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 3db1ac0..8563be1 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -339,9 +339,9 @@ setGeneric("join", function(x, y, ...) { standardGeneric("join") })
 # @export
 setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin")
})
 
-# @rdname partitionBy
-# @export
-setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy")
})
+#' @rdname partitionBy
+#' @export
+setGeneric("partitionBy", function(x, ...) { standardGeneric("partitionBy") })
 
 # @rdname reduceByKey
 # @seealso groupByKey
@@ -533,7 +533,7 @@ setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") })
 
 #' @rdname arrange
 #' @export
-setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") })
+setGeneric("orderBy", function(x, col, ...) { standardGeneric("orderBy") })
 
 #' @rdname schema
 #' @export
@@ -733,6 +733,27 @@ setGeneric("when", function(condition, value) { standardGeneric("when")
})
 #' @export
 setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") })
 
+#' @rdname over
+#' @export
+setGeneric("over", function(x, window) { standardGeneric("over") })
+
+###################### WindowSpec Methods ##########################
+
+#' @rdname rowsBetween
+#' @export
+setGeneric("rowsBetween", function(x, start, end) { standardGeneric("rowsBetween") })
+
+#' @rdname rangeBetween
+#' @export
+setGeneric("rangeBetween", function(x, start, end) { standardGeneric("rangeBetween") })
+
+#' @rdname window.partitionBy
+#' @export
+setGeneric("window.partitionBy", function(col, ...) { standardGeneric("window.partitionBy")
})
+
+#' @rdname window.orderBy
+#' @export
+setGeneric("window.orderBy", function(col, ...) { standardGeneric("window.orderBy") })
 
 ###################### Expression Function Methods ##########################
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 4075ef4..d39775c 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -205,8 +205,10 @@ setMethod("flatMapValues",
 #' @aliases partitionBy,RDD,integer-method
 #' @noRd
 setMethod("partitionBy",
-          signature(x = "RDD", numPartitions = "numeric"),
+          signature(x = "RDD"),
           function(x, numPartitions, partitionFunc = hashCode) {
+            stopifnot(is.numeric(numPartitions))
+
             partitionFunc <- cleanClosure(partitionFunc)
             serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/R/window.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/window.R b/R/pkg/R/window.R
new file mode 100644
index 0000000..7ecf70a
--- /dev/null
+++ b/R/pkg/R/window.R
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# window.R - Utility functions for defining window in DataFrames
+
+#' window.partitionBy
+#'
+#' Creates a WindowSpec with the partitioning defined.
+#'
+#' @rdname window.partitionBy
+#' @name window.partitionBy
+#' @export
+#' @examples
+#' \dontrun{
+#'   ws <- window.partitionBy("key1", "key2")
+#'   df1 <- select(df, over(lead("value", 1), ws))
+#'
+#'   ws <- window.partitionBy(df$key1, df$key2)
+#'   df1 <- select(df, over(lead("value", 1), ws))
+#' }
+setMethod("window.partitionBy",
+          signature(col = "character"),
+          function(col, ...) {
+            windowSpec(
+              callJStatic("org.apache.spark.sql.expressions.Window",
+                          "partitionBy",
+                          col,
+                          list(...)))
+          })
+
+#' @rdname window.partitionBy
+#' @name window.partitionBy
+#' @export
+setMethod("window.partitionBy",
+          signature(col = "Column"),
+          function(col, ...) {
+            jcols <- lapply(list(col, ...), function(c) {
+              c@jc
+            })
+            windowSpec(
+              callJStatic("org.apache.spark.sql.expressions.Window",
+                          "partitionBy",
+                          jcols))
+          })
+
+#' window.orderBy
+#'
+#' Creates a WindowSpec with the ordering defined.
+#'
+#' @rdname window.orderBy
+#' @name window.orderBy
+#' @export
+#' @examples
+#' \dontrun{
+#'   ws <- window.orderBy("key1", "key2")
+#'   df1 <- select(df, over(lead("value", 1), ws))
+#'
+#'   ws <- window.orderBy(df$key1, df$key2)
+#'   df1 <- select(df, over(lead("value", 1), ws))
+#' }
+setMethod("window.orderBy",
+          signature(col = "character"),
+          function(col, ...) {
+            windowSpec(
+              callJStatic("org.apache.spark.sql.expressions.Window",
+                          "orderBy",
+                          col,
+                          list(...)))
+          })
+
+#' @rdname window.orderBy
+#' @name window.orderBy
+#' @export
+setMethod("window.orderBy",
+          signature(col = "Column"),
+          function(col, ...) {
+            jcols <- lapply(list(col, ...), function(c) {
+              c@jc
+            })
+            windowSpec(
+              callJStatic("org.apache.spark.sql.expressions.Window",
+                          "orderBy",
+                          jcols))
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/157a49aa/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 3b6a27c..0f67bc2 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2118,6 +2118,42 @@ test_that("repartition by columns on DataFrame", {
   expect_equal(nrow(df1), 2)
 })
 
+test_that("Window functions on a DataFrame", {
+  ssc <- callJMethod(sc, "sc")
+  hiveCtx <- tryCatch({
+    newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
+  },
+  error = function(err) {
+    skip("Hive is not build with SparkSQL, skipped")
+  })
+
+  df <- createDataFrame(hiveCtx,
+                        list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
+                        schema = c("key", "value"))
+  ws <- orderBy(window.partitionBy("key"), "value")
+  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+  names(result) <- c("key", "value")
+  expected <- data.frame(key = c(1L, NA, 2L, NA),
+                       value = c("1", NA, "2", NA),
+                       stringsAsFactors = FALSE)
+  expect_equal(result, expected)
+
+  ws <- orderBy(window.partitionBy(df$key), df$value)
+  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+  names(result) <- c("key", "value")
+  expect_equal(result, expected)
+
+  ws <- partitionBy(window.orderBy("value"), "key")
+  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+  names(result) <- c("key", "value")
+  expect_equal(result, expected)
+
+  ws <- partitionBy(window.orderBy(df$value), df$key)
+  result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+  names(result) <- c("key", "value")
+  expect_equal(result, expected)
+})
+
 unlink(parquetPath)
 unlink(jsonPath)
 unlink(jsonPathNa)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message