spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-21640][SQL][PYTHON][R][FOLLOWUP] Add errorifexists in SparkR and other documentations
Date Thu, 09 Nov 2017 06:00:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master d01044233 -> 695647bf2


[SPARK-21640][SQL][PYTHON][R][FOLLOWUP] Add errorifexists in SparkR and other documentations

## What changes were proposed in this pull request?

This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing
the mode, mainly, in API documentations as well.

This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed
to JVM and executes:

https://github.com/apache/spark/blob/b034f2565f72aa73c9f0be1e49d148bb4cf05153/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L72-L82

and remove the duplication here:

https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L187-L194

## How was this patch tested?

Manually checked the built documentation. These were mainly found by `` grep -r `error` ``
and `grep -r 'error'`.

Also, unit tests added in `test_sparkSQL.R`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19673 from HyukjinKwon/SPARK-21640-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695647bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695647bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695647bf

Branch: refs/heads/master
Commit: 695647bf2ebda56f9effb7fcdd875490132ea012
Parents: d010442
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Thu Nov 9 15:00:31 2017 +0900
Committer: hyukjinkwon <gurwls223@gmail.com>
Committed: Thu Nov 9 15:00:31 2017 +0900

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                             | 79 ++++++++++++--------
 R/pkg/R/utils.R                                 |  9 ---
 R/pkg/tests/fulltests/test_sparkSQL.R           |  8 ++
 R/pkg/tests/fulltests/test_utils.R              |  8 --
 python/pyspark/sql/readwriter.py                | 25 ++++---
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +-
 .../org/apache/spark/sql/api/r/SQLUtils.scala   |  9 ---
 7 files changed, 71 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 763c8d2..b8d732a 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -58,14 +58,23 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached)
{
 #' Set options/mode and then return the write object
 #' @noRd
 setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
-    options <- varargsToStrEnv(...)
-    if (!is.null(path)) {
-      options[["path"]] <- path
-    }
-    jmode <- convertToJSaveMode(mode)
-    write <- callJMethod(write, "mode", jmode)
-    write <- callJMethod(write, "options", options)
-    write
+  options <- varargsToStrEnv(...)
+  if (!is.null(path)) {
+    options[["path"]] <- path
+  }
+  write <- setWriteMode(write, mode)
+  write <- callJMethod(write, "options", options)
+  write
+}
+
+#' Set mode and then return the write object
+#' @noRd
+setWriteMode <- function(write, mode) {
+  if (!is.character(mode)) {
+    stop("mode should be character or omitted. It is 'error' by default.")
+  }
+  write <- handledCallJMethod(write, "mode", mode)
+  write
 }
 
 #' @export
@@ -556,9 +565,8 @@ setMethod("registerTempTable",
 setMethod("insertInto",
           signature(x = "SparkDataFrame", tableName = "character"),
           function(x, tableName, overwrite = FALSE) {
-            jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
             write <- callJMethod(x@sdf, "write")
-            write <- callJMethod(write, "mode", jmode)
+            write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append"))
             invisible(callJMethod(write, "insertInto", tableName))
           })
 
@@ -810,7 +818,8 @@ setMethod("toJSON",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default)
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -841,7 +850,8 @@ setMethod("write.json",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default)
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -872,7 +882,8 @@ setMethod("write.orc",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default)
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -917,7 +928,8 @@ setMethod("saveAsParquetFile",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default)
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -2871,18 +2883,19 @@ setMethod("except",
 #' Additionally, mode is used to specify the behavior of the save operation when data already
 #' exists in the data source. There are four modes:
 #' \itemize{
-#'   \item append: Contents of this SparkDataFrame are expected to be appended to existing
data.
-#'   \item overwrite: Existing data is expected to be overwritten by the contents of this
+#'   \item 'append': Contents of this SparkDataFrame are expected to be appended to existing
data.
+#'   \item 'overwrite': Existing data is expected to be overwritten by the contents of this
 #'         SparkDataFrame.
-#'   \item error: An exception is expected to be thrown.
-#'   \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
+#'   \item 'error' or 'errorifexists': An exception is expected to be thrown.
+#'   \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
 #'         and to not change the existing data.
 #' }
 #'
 #' @param df a SparkDataFrame.
 #' @param path a name for the table.
 #' @param source a name for external data source.
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default)
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -2940,17 +2953,18 @@ setMethod("saveDF",
 #'
 #' Additionally, mode is used to specify the behavior of the save operation when
 #' data already exists in the data source. There are four modes: \cr
-#'  append: Contents of this SparkDataFrame are expected to be appended to existing data.
\cr
-#'  overwrite: Existing data is expected to be overwritten by the contents of this
+#'  'append': Contents of this SparkDataFrame are expected to be appended to existing data.
\cr
+#'  'overwrite': Existing data is expected to be overwritten by the contents of this
 #'     SparkDataFrame. \cr
-#'  error: An exception is expected to be thrown. \cr
-#'  ignore: The save operation is expected to not save the contents of the SparkDataFrame
+#'  'error' or 'errorifexists': An exception is expected to be thrown. \cr
+#'  'ignore': The save operation is expected to not save the contents of the SparkDataFrame
 #'     and to not change the existing data. \cr
 #'
 #' @param df a SparkDataFrame.
 #' @param tableName a name for the table.
 #' @param source a name for external data source.
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default).
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional option(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -2972,12 +2986,11 @@ setMethod("saveAsTable",
             if (is.null(source)) {
               source <- getDefaultSqlSource()
             }
-            jmode <- convertToJSaveMode(mode)
             options <- varargsToStrEnv(...)
 
             write <- callJMethod(df@sdf, "write")
             write <- callJMethod(write, "format", source)
-            write <- callJMethod(write, "mode", jmode)
+            write <- setWriteMode(write, mode)
             write <- callJMethod(write, "options", options)
             invisible(callJMethod(write, "saveAsTable", tableName))
           })
@@ -3544,18 +3557,19 @@ setMethod("histogram",
 #' Also, mode is used to specify the behavior of the save operation when
 #' data already exists in the data source. There are four modes:
 #' \itemize{
-#'   \item append: Contents of this SparkDataFrame are expected to be appended to existing
data.
-#'   \item overwrite: Existing data is expected to be overwritten by the contents of this
+#'   \item 'append': Contents of this SparkDataFrame are expected to be appended to existing
data.
+#'   \item 'overwrite': Existing data is expected to be overwritten by the contents of this
 #'         SparkDataFrame.
-#'   \item error: An exception is expected to be thrown.
-#'   \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
+#'   \item 'error' or 'errorifexists': An exception is expected to be thrown.
+#'   \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
 #'         and to not change the existing data.
 #' }
 #'
 #' @param x a SparkDataFrame.
 #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}.
 #' @param tableName yhe name of the table in the external database.
-#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by
default).
+#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
+#'             save mode (it is 'error' by default)
 #' @param ... additional JDBC database connection properties.
 #' @family SparkDataFrame functions
 #' @rdname write.jdbc
@@ -3572,10 +3586,9 @@ setMethod("histogram",
 setMethod("write.jdbc",
           signature(x = "SparkDataFrame", url = "character", tableName = "character"),
           function(x, url, tableName, mode = "error", ...) {
-            jmode <- convertToJSaveMode(mode)
             jprops <- varargsToJProperties(...)
             write <- callJMethod(x@sdf, "write")
-            write <- callJMethod(write, "mode", jmode)
+            write <- setWriteMode(write, mode)
             invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
           })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 4b71699..fa40992 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -736,15 +736,6 @@ splitString <- function(input) {
   Filter(nzchar, unlist(strsplit(input, ",|\\s")))
 }
 
-convertToJSaveMode <- function(mode) {
- allModes <- c("append", "overwrite", "error", "ignore")
- if (!(mode %in% allModes)) {
-   stop('mode should be one of "append", "overwrite", "error", "ignore"')  # nolint
- }
- jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
- jmode
-}
-
 varargsToJProperties <- function(...) {
   pairs <- list(...)
   props <- newJObject("java.util.Properties")

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 0c8118a..a0dbd47 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -630,6 +630,10 @@ test_that("read/write json files", {
     jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json")
     write.df(df, jsonPath2, "json", mode = "overwrite")
 
+    # Test errorifexists
+    expect_error(write.df(df, jsonPath2, "json", mode = "errorifexists"),
+                 "analysis error - path file:.*already exists")
+
     # Test write.json
     jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
     write.json(df, jsonPath3)
@@ -1371,6 +1375,9 @@ test_that("test HiveContext", {
     expect_equal(count(df5), 3)
     unlink(parquetDataPath)
 
+    # Invalid mode
+    expect_error(saveAsTable(df, "parquetest", "parquet", mode = "abc", path = parquetDataPath),
+                 "illegal argument - Unknown save mode: abc")
     unsetHiveContext()
   }
 })
@@ -3303,6 +3310,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and
check argume
               "Error in orc : analysis error - path file:.*already exists")
   expect_error(write.parquet(df, jsonPath),
               "Error in parquet : analysis error - path file:.*already exists")
+  expect_error(write.parquet(df, jsonPath, mode = 123), "mode should be character or omitted.")
 
   # Arguments checking in R side.
   expect_error(write.df(df, "data.tmp", source = c(1, 2)),

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/R/pkg/tests/fulltests/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_utils.R b/R/pkg/tests/fulltests/test_utils.R
index af81423..fb394b8 100644
--- a/R/pkg/tests/fulltests/test_utils.R
+++ b/R/pkg/tests/fulltests/test_utils.R
@@ -158,14 +158,6 @@ test_that("varargsToJProperties", {
   expect_equal(callJMethod(jprops, "size"), 0L)
 })
 
-test_that("convertToJSaveMode", {
-  s <- convertToJSaveMode("error")
-  expect_true(class(s) == "jobj")
-  expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode
id ")
-  expect_error(convertToJSaveMode("foo"),
-    'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
-})
-
 test_that("captureJVMException", {
   method <- "createStructField"
   expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3d87567..a75bdf8 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -540,7 +540,7 @@ class DataFrameWriter(OptionUtils):
 
         * `append`: Append contents of this :class:`DataFrame` to existing data.
         * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
+        * `error` or `errorifexists`: Throw an exception if data already exists.
         * `ignore`: Silently ignore this operation if data already exists.
 
         >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
@@ -675,7 +675,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
         :param partitionBy: names of partitioning columns
         :param options: all other string options
 
@@ -713,12 +714,13 @@ class DataFrameWriter(OptionUtils):
 
         * `append`: Append contents of this :class:`DataFrame` to existing data.
         * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
+        * `error` or `errorifexists`: Throw an exception if data already exists.
         * `ignore`: Silently ignore this operation if data already exists.
 
         :param name: the table name
         :param format: the format used to save
-        :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+        :param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore` \
+                     (default: error)
         :param partitionBy: names of partitioning columns
         :param options: all other string options
         """
@@ -741,7 +743,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
         :param compression: compression codec to use when saving to file. This can be one
of the
                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
                             snappy and deflate).
@@ -771,7 +774,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
         :param partitionBy: names of partitioning columns
         :param compression: compression codec to use when saving to file. This can be one
of the
                             known case-insensitive shorten names (none, snappy, gzip, and
lzo).
@@ -814,7 +818,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
 
         :param compression: compression codec to use when saving to file. This can be one
of the
                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
@@ -874,7 +879,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
         :param partitionBy: names of partitioning columns
         :param compression: compression codec to use when saving to file. This can be one
of the
                             known case-insensitive shorten names (none, snappy, zlib, and
lzo).
@@ -905,7 +911,8 @@ class DataFrameWriter(OptionUtils):
             * ``append``: Append contents of this :class:`DataFrame` to existing data.
             * ``overwrite``: Overwrite existing data.
             * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+            * ``error`` or ``errorifexists`` (default case): Throw an exception if data already
\
+                exists.
         :param properties: a dictionary of JDBC database connection arguments. Normally at
                            least properties "user" and "password" with their corresponding
values.
                            For example { 'user' : 'SYSTEM', 'password' : 'mypassword' }

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8d95b24..e3fa2ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -65,7 +65,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    *   - `overwrite`: overwrite the existing data.
    *   - `append`: append the data.
    *   - `ignore`: ignore the operation (i.e. no-op).
-   *   - `error`: default option, throw an exception at runtime.
+   *   - `error` or `errorifexists`: default option, throw an exception at runtime.
    *
    * @since 1.4.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/695647bf/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 872ef77..af20764 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -184,15 +184,6 @@ private[sql] object SQLUtils extends Logging {
     colArray
   }
 
-  def saveMode(mode: String): SaveMode = {
-    mode match {
-      case "append" => SaveMode.Append
-      case "overwrite" => SaveMode.Overwrite
-      case "error" => SaveMode.ErrorIfExists
-      case "ignore" => SaveMode.Ignore
-    }
-  }
-
   def readSqlObject(dis: DataInputStream, dataType: Char): Object = {
     dataType match {
       case 's' =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message