spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From felixche...@apache.org
Subject spark git commit: [SPARK-18013][SPARKR] add crossJoin API
Date Fri, 21 Oct 2016 19:35:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4efdc764e -> e21e1c946


[SPARK-18013][SPARKR] add crossJoin API

## What changes were proposed in this pull request?

Add crossJoin and do not default to cross join if joinExpr is left out

## How was this patch tested?

unit test

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #15559 from felixcheung/rcrossjoin.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e21e1c94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e21e1c94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e21e1c94

Branch: refs/heads/master
Commit: e21e1c946c4b7448fb150cfa2d9419864ae6f9b5
Parents: 4efdc76
Author: Felix Cheung <felixcheung_m@hotmail.com>
Authored: Fri Oct 21 12:35:37 2016 -0700
Committer: Felix Cheung <felixcheung@apache.org>
Committed: Fri Oct 21 12:35:37 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/DataFrame.R                       | 59 ++++++++++++++++++++------
 R/pkg/R/generics.R                        |  4 ++
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 ++++-
 docs/sparkr.md                            |  4 ++
 5 files changed, 64 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 5960c62..8718185 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -71,6 +71,7 @@ exportMethods("arrange",
               "covar_samp",
               "covar_pop",
               "createOrReplaceTempView",
+              "crossJoin",
               "crosstab",
               "dapply",
               "dapplyCollect",

http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 801d2ed..8910a4b 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2271,12 +2271,13 @@ setMethod("dropDuplicates",
 
 #' Join
 #'
-#' Join two SparkDataFrames based on the given join expression.
+#' Joins two SparkDataFrames based on the given join expression.
 #'
 #' @param x A SparkDataFrame
 #' @param y A SparkDataFrame
 #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
-#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
+#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an
error is
+#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
 #' @param joinType The type of join to perform. The following join types are available:
 #' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
 #' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
@@ -2285,23 +2286,24 @@ setMethod("dropDuplicates",
 #' @aliases join,SparkDataFrame,SparkDataFrame-method
 #' @rdname join
 #' @name join
-#' @seealso \link{merge}
+#' @seealso \link{merge} \link{crossJoin}
 #' @export
 #' @examples
 #'\dontrun{
 #' sparkR.session()
 #' df1 <- read.json(path)
 #' df2 <- read.json(path2)
-#' join(df1, df2) # Performs a Cartesian
 #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
 #' join(df1, df2, df1$col1 == df2$col2, "right_outer")
+#' join(df1, df2) # Attempts an inner join
 #' }
 #' @note join since 1.4.0
 setMethod("join",
           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
           function(x, y, joinExpr = NULL, joinType = NULL) {
             if (is.null(joinExpr)) {
-              sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
+              # this may not fail until the planner checks for Cartesian join later on.
+              sdf <- callJMethod(x@sdf, "join", y@sdf)
             } else {
               if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
               if (is.null(joinType)) {
@@ -2322,22 +2324,52 @@ setMethod("join",
             dataFrame(sdf)
           })
 
+#' CrossJoin
+#'
+#' Returns Cartesian Product on two SparkDataFrames.
+#'
+#' @param x A SparkDataFrame
+#' @param y A SparkDataFrame
+#' @return A SparkDataFrame containing the result of the join operation.
+#' @family SparkDataFrame functions
+#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
+#' @rdname crossJoin
+#' @name crossJoin
+#' @seealso \link{merge} \link{join}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- read.json(path)
+#' df2 <- read.json(path2)
+#' crossJoin(df1, df2) # Performs a Cartesian
+#' }
+#' @note crossJoin since 2.1.0
+setMethod("crossJoin",
+          signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+          function(x, y) {
+            sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
+            dataFrame(sdf)
+          })
+
 #' Merges two data frames
 #'
 #' @name merge
-#' @param x the first data frame to be joined
-#' @param y the second data frame to be joined
+#' @param x the first data frame to be joined.
+#' @param y the second data frame to be joined.
 #' @param by a character vector specifying the join columns. If by is not
 #'   specified, the common column names in \code{x} and \code{y} will be used.
+#'   If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
+#'   Product of x and y will be returned.
 #' @param by.x a character vector specifying the joining columns for x.
 #' @param by.y a character vector specifying the joining columns for y.
 #' @param all a boolean value setting \code{all.x} and \code{all.y}
 #'            if any of them are unset.
 #' @param all.x a boolean value indicating whether all the rows in x should
-#'              be including in the join
+#'              be including in the join.
 #' @param all.y a boolean value indicating whether all the rows in y should
-#'              be including in the join
-#' @param sort a logical argument indicating whether the resulting columns should be sorted
+#'              be including in the join.
+#' @param sort a logical argument indicating whether the resulting columns should be sorted.
 #' @param suffixes a string vector of length 2 used to make colnames of
 #'                 \code{x} and \code{y} unique.
 #'                 The first element is appended to each colname of \code{x}.
@@ -2351,20 +2383,21 @@ setMethod("join",
 #' @family SparkDataFrame functions
 #' @aliases merge,SparkDataFrame,SparkDataFrame-method
 #' @rdname merge
-#' @seealso \link{join}
+#' @seealso \link{join} \link{crossJoin}
 #' @export
 #' @examples
 #'\dontrun{
 #' sparkR.session()
 #' df1 <- read.json(path)
 #' df2 <- read.json(path2)
-#' merge(df1, df2) # Performs a Cartesian
+#' merge(df1, df2) # Performs an inner join by common columns
 #' merge(df1, df2, by = "col1") # Performs an inner join based on expression
 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
 #' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
 #' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
+#' merge(df1, df2, by = NULL) # Performs a Cartesian join
 #' }
 #' @note merge since 1.5.0
 setMethod("merge",
@@ -2401,7 +2434,7 @@ setMethod("merge",
               joinY <- by
             } else {
               # if by or both by.x and by.y have length 0, use Cartesian Product
-              joinRes <- join(x, y)
+              joinRes <- crossJoin(x, y)
               return (joinRes)
             }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 810aea9..5549cd7 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -468,6 +468,10 @@ setGeneric("createOrReplaceTempView",
              standardGeneric("createOrReplaceTempView")
            })
 
+# @rdname crossJoin
+# @export
+setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })
+
 #' @rdname dapply
 #' @export
 setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })

http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1c80686..3a987cd 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1572,7 +1572,7 @@ test_that("filter() on a DataFrame", {
   #expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
 })
 
-test_that("join() and merge() on a DataFrame", {
+test_that("join(), crossJoin() and merge() on a DataFrame", {
   df <- read.json(jsonPath)
 
   mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
@@ -1583,7 +1583,14 @@ test_that("join() and merge() on a DataFrame", {
   writeLines(mockLines2, jsonPath2)
   df2 <- read.json(jsonPath2)
 
-  joined <- join(df, df2)
+  # inner join, not cartesian join
+  expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
+  # cartesian join
+  expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
+               paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product
for",
+                      " INNER join between logical plans).*"))
+
+  joined <- crossJoin(df, df2)
   expect_equal(names(joined), c("age", "name", "name", "test"))
   expect_equal(count(joined), 12)
   expect_equal(names(collect(joined)), c("age", "name", "name", "test"))

http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/docs/sparkr.md
----------------------------------------------------------------------
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 340e7f7..c1829ef 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -591,3 +591,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma
  - The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`.
  - The method `dropTempTable` has been deprecated to be replaced by `dropTempView`.
  - The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`,
`clearJobGroup`, `cancelJobGroup`
+
+## Upgrading to SparkR 2.1.0
+
+ - `join` no longer performs Cartesian Product by default, use `crossJoin` instead.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message