spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From felixche...@apache.org
Subject spark git commit: [SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable
Date Thu, 06 Apr 2017 16:15:17 GMT
Repository: spark
Updated Branches:
  refs/heads/master bccc33019 -> 5a693b413


[SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable

## What changes were proposed in this pull request?

Following up on #17483, add createTable (which is new in 2.2.0) and deprecate createExternalTable,
plus a number of minor fixes

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17511 from felixcheung/rceatetable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a693b41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a693b41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a693b41

Branch: refs/heads/master
Commit: 5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9
Parents: bccc330
Author: Felix Cheung <felixcheung_m@hotmail.com>
Authored: Thu Apr 6 09:15:13 2017 -0700
Committer: Felix Cheung <felixcheung@apache.org>
Committed: Thu Apr 6 09:15:13 2017 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/DataFrame.R                       |  4 +-
 R/pkg/R/catalog.R                         | 59 ++++++++++++++++++++++----
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 20 ++++++---
 4 files changed, 68 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a693b41/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 9b7e95c..ca45c6f 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -361,6 +361,7 @@ export("as.DataFrame",
        "clearCache",
        "createDataFrame",
        "createExternalTable",
+       "createTable",
        "currentDatabase",
        "dropTempTable",
        "dropTempView",

http://git-wip-us.apache.org/repos/asf/spark/blob/5a693b41/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 97786df..ec85f72 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -557,7 +557,7 @@ setMethod("insertInto",
             jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
             write <- callJMethod(x@sdf, "write")
             write <- callJMethod(write, "mode", jmode)
-            callJMethod(write, "insertInto", tableName)
+            invisible(callJMethod(write, "insertInto", tableName))
           })
 
 #' Cache
@@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
             write <- callJMethod(write, "format", source)
             write <- callJMethod(write, "mode", jmode)
             write <- callJMethod(write, "options", options)
-            callJMethod(write, "saveAsTable", tableName)
+            invisible(callJMethod(write, "saveAsTable", tableName))
           })
 
 #' summary

http://git-wip-us.apache.org/repos/asf/spark/blob/5a693b41/R/pkg/R/catalog.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index 4b7f841..e59a702 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -17,7 +17,7 @@
 
 # catalog.R: SparkSession catalog functions
 
-#' Create an external table
+#' (Deprecated) Create an external table
 #'
 #' Creates an external table based on the dataset in a data source,
 #' Returns a SparkDataFrame associated with the external table.
@@ -29,10 +29,11 @@
 #' @param tableName a name of the table.
 #' @param path the path of files to load.
 #' @param source the name of external data source.
-#' @param schema the schema of the data for certain data source.
+#' @param schema the schema of the data required for some data sources.
 #' @param ... additional argument(s) passed to the method.
 #' @return A SparkDataFrame.
-#' @rdname createExternalTable
+#' @rdname createExternalTable-deprecated
+#' @seealso \link{createTable}
 #' @export
 #' @examples
 #'\dontrun{
@@ -43,24 +44,64 @@
 #' @method createExternalTable default
 #' @note createExternalTable since 1.4.0
 createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema
= NULL, ...) {
+  .Deprecated("createTable", old = "createExternalTable")
+  createTable(tableName, path, source, schema, ...)
+}
+
+createExternalTable <- function(x, ...) {
+  dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
+}
+
+#' Creates a table based on the dataset in a data source
+#'
+#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated
with
+#' the table.
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external
table is
+#' created from the data at the given path. Otherwise a managed table is created.
+#'
+#' @param tableName the qualified or unqualified name that designates a table. If no database
+#'                  identifier is provided, it refers to a table in the current database.
+#' @param path (optional) the path of files to load.
+#' @param source (optional) the name of the data source.
+#' @param schema (optional) the schema of the data required for some data sources.
+#' @param ... additional named parameters as options for the data source.
+#' @return A SparkDataFrame.
+#' @rdname createTable
+#' @seealso \link{createExternalTable}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createTable("myjson", path="path/to/json", source="json", schema)
+#'
+#' createTable("people", source = "json", schema = schema)
+#' insertInto(df, "people")
+#' }
+#' @name createTable
+#' @note createTable since 2.2.0
+createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
   sparkSession <- getSparkSession()
   options <- varargsToStrEnv(...)
   if (!is.null(path)) {
     options[["path"]] <- path
   }
+  if (is.null(source)) {
+    source <- getDefaultSqlSource()
+  }
   catalog <- callJMethod(sparkSession, "catalog")
   if (is.null(schema)) {
-    sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
+    sdf <- callJMethod(catalog, "createTable", tableName, source, options)
+  } else if (class(schema) == "structType") {
+    sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
   } else {
-    sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj,
options)
+    stop("schema must be a structType.")
   }
   dataFrame(sdf)
 }
 
-createExternalTable <- function(x, ...) {
-  dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
-}
-
 #' Cache Table
 #'
 #' Caches the specified table in-memory.

http://git-wip-us.apache.org/repos/asf/spark/blob/5a693b41/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ad06711..58cf2425 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
   setHiveContext(sc)
   sql("CREATE TABLE people (name string, age double, height float)")
   df <- read.df(jsonPathNa, "json", schema)
-  invisible(insertInto(df, "people"))
+  insertInto(df, "people")
   expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
                c(16))
   expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
@@ -1268,7 +1268,16 @@ test_that("column calculation", {
 
 test_that("test HiveContext", {
   setHiveContext(sc)
-  df <- createExternalTable("json", jsonPath, "json")
+
+  schema <- structType(structField("name", "string"), structField("age", "integer"),
+                       structField("height", "float"))
+  createTable("people", source = "json", schema = schema)
+  df <- read.df(jsonPathNa, "json", schema)
+  insertInto(df, "people")
+  expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
+  sql("DROP TABLE people")
+
+  df <- createTable("json", jsonPath, "json")
   expect_is(df, "SparkDataFrame")
   expect_equal(count(df), 3)
   df2 <- sql("select * from json")
@@ -1276,25 +1285,26 @@ test_that("test HiveContext", {
   expect_equal(count(df2), 3)
 
   jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-  invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
+  saveAsTable(df, "json2", "json", "append", path = jsonPath2)
   df3 <- sql("select * from json2")
   expect_is(df3, "SparkDataFrame")
   expect_equal(count(df3), 3)
   unlink(jsonPath2)
 
   hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-  invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
+  saveAsTable(df, "hivetestbl", path = hivetestDataPath)
   df4 <- sql("select * from hivetestbl")
   expect_is(df4, "SparkDataFrame")
   expect_equal(count(df4), 3)
   unlink(hivetestDataPath)
 
   parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-  invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
+  saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
   df5 <- sql("select * from parquetest")
   expect_is(df5, "SparkDataFrame")
   expect_equal(count(df5), 3)
   unlink(parquetDataPath)
+
   unsetHiveContext()
 })
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message