spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-19148][SQL] do not expose the external table concept in Catalog
Date Tue, 17 Jan 2017 04:54:58 GMT
Repository: spark
Updated Branches:
  refs/heads/master f8db8945f -> 18ee55dd5


[SPARK-19148][SQL] do not expose the external table concept in Catalog

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide
the external/managed table concept to users and only expose custom table path.

This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions
for backward compatibility), and only set the table type to EXTERNAL if `path` is specified
in options.

## How was this patch tested?

new tests in `CatalogSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16528 from cloud-fan/create-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18ee55dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18ee55dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18ee55dd

Branch: refs/heads/master
Commit: 18ee55dd5de0597d7fb69e8e16ac3744356a6918
Parents: f8db894
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Tue Jan 17 12:54:50 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Jan 17 12:54:50 2017 +0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   5 +-
 python/pyspark/sql/catalog.py                   |  27 +++-
 .../org/apache/spark/sql/catalog/Catalog.scala  | 129 ++++++++++++++++---
 .../command/createDataSourceTables.scala        |   9 --
 .../apache/spark/sql/internal/CatalogImpl.scala |  78 ++++-------
 .../spark/sql/internal/CatalogSuite.scala       |  66 +++++++---
 6 files changed, 211 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2314d7f..e0ee00e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -43,7 +43,10 @@ object MimaExcludes {
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),
 
     // [SPARK-18537] Add a REST api to spark streaming
-    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"),
+
+    // [SPARK-19148][SQL] do not expose the external table concept in Catalog
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable")
   )
 
   // Exclude rules for 2.1.x

http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/python/pyspark/sql/catalog.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 30c7a3f..253a750 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import warnings
 from collections import namedtuple
 
 from pyspark import since
@@ -138,7 +139,27 @@ class Catalog(object):
 
     @since(2.0)
     def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
-        """Creates an external table based on the dataset in a data source.
+        """Creates a table based on the dataset in a data source.
+
+        It returns the DataFrame associated with the external table.
+
+        The data source is specified by the ``source`` and a set of ``options``.
+        If ``source`` is not specified, the default data source configured by
+        ``spark.sql.sources.default`` will be used.
+
+        Optionally, a schema can be provided as the schema of the returned :class:`DataFrame`
and
+        created external table.
+
+        :return: :class:`DataFrame`
+        """
+        warnings.warn(
+            "createExternalTable is deprecated since Spark 2.2, please use createTable instead.",
+            DeprecationWarning)
+        return self.createTable(tableName, path, source, schema, **options)
+
+    @since(2.2)
+    def createTable(self, tableName, path=None, source=None, schema=None, **options):
+        """Creates a table based on the dataset in a data source.
 
         It returns the DataFrame associated with the external table.
 
@@ -157,12 +178,12 @@ class Catalog(object):
             source = self._sparkSession.conf.get(
                 "spark.sql.sources.default", "org.apache.spark.sql.parquet")
         if schema is None:
-            df = self._jcatalog.createExternalTable(tableName, source, options)
+            df = self._jcatalog.createTable(tableName, source, options)
         else:
             if not isinstance(schema, StructType):
                 raise TypeError("schema should be StructType")
             scala_datatype = self._jsparkSession.parseDataType(schema.json())
-            df = self._jcatalog.createExternalTable(tableName, source, scala_datatype, options)
+            df = self._jcatalog.createTable(tableName, source, scala_datatype, options)
         return DataFrame(df, self._sparkSession._wrapped)
 
     @since(2.0)

http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6b061f8..41e781e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalog
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
 import org.apache.spark.sql.types.StructType
@@ -187,82 +189,169 @@ abstract class Catalog {
   def functionExists(dbName: String, functionName: String): Boolean
 
   /**
-   * :: Experimental ::
-   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * Creates a table from the given path and returns the corresponding DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.
    *
    * @since 2.0.0
    */
+  @deprecated("use createTable instead.", "2.2.0")
+  def createExternalTable(tableName: String, path: String): DataFrame = {
+    createTable(tableName, path)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the given path and returns the corresponding DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   *
+   * @since 2.2.0
+   */
   @Experimental
   @InterfaceStability.Evolving
-  def createExternalTable(tableName: String, path: String): DataFrame
+  def createTable(tableName: String, path: String): DataFrame
 
   /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
+   * Creates a table from the given path based on a data source and returns the corresponding
+   * DataFrame.
    *
    * @since 2.0.0
    */
+  @deprecated("use createTable instead.", "2.2.0")
+  def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
+    createTable(tableName, path, source)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the given path based on a data source and returns the corresponding
+   * DataFrame.
+   *
+   * @since 2.2.0
+   */
   @Experimental
   @InterfaceStability.Evolving
-  def createExternalTable(tableName: String, path: String, source: String): DataFrame
+  def createTable(tableName: String, path: String, source: String): DataFrame
 
   /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source and a set of options.
+   * Creates a table from the given path based on a data source and a set of options.
    * Then, returns the corresponding DataFrame.
    *
    * @since 2.0.0
    */
+  @deprecated("use createTable instead.", "2.2.0")
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    createTable(tableName, source, options)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.2.0
+   */
   @Experimental
   @InterfaceStability.Evolving
+  def createTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    createTable(tableName, source, options.asScala.toMap)
+  }
+
+  /**
+   * (Scala-specific)
+   * Creates a table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @deprecated("use createTable instead.", "2.2.0")
   def createExternalTable(
       tableName: String,
       source: String,
-      options: java.util.Map[String, String]): DataFrame
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, options)
+  }
 
   /**
    * :: Experimental ::
    * (Scala-specific)
-   * Creates an external table from the given path based on a data source and a set of options.
+   * Creates a table from the given path based on a data source and a set of options.
    * Then, returns the corresponding DataFrame.
    *
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
   @InterfaceStability.Evolving
-  def createExternalTable(
+  def createTable(
       tableName: String,
       source: String,
       options: Map[String, String]): DataFrame
 
   /**
    * :: Experimental ::
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Create a table from the given path based on a data source, a schema and a set of options.
+   * Then, returns the corresponding DataFrame.
    *
    * @since 2.0.0
    */
+  @deprecated("use createTable instead.", "2.2.0")
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    createTable(tableName, source, schema, options)
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a table from the given path based on a data source, a schema and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.2.0
+   */
   @Experimental
   @InterfaceStability.Evolving
+  def createTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    createTable(tableName, source, schema, options.asScala.toMap)
+  }
+
+  /**
+   * (Scala-specific)
+   * Create a table from the given path based on a data source, a schema and a set of options.
+   * Then, returns the corresponding DataFrame.
+   *
+   * @since 2.0.0
+   */
+  @deprecated("use createTable instead.", "2.2.0")
   def createExternalTable(
       tableName: String,
       source: String,
       schema: StructType,
-      options: java.util.Map[String, String]): DataFrame
+      options: Map[String, String]): DataFrame = {
+    createTable(tableName, source, schema, options)
+  }
 
   /**
    * :: Experimental ::
    * (Scala-specific)
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Create a table from the given path based on a data source, a schema and a set of options.
+   * Then, returns the corresponding DataFrame.
    *
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
   @InterfaceStability.Evolving
-  def createExternalTable(
+  def createTable(
       tableName: String,
       source: String,
       schema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 90aeebd..beeba05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists:
Boo
         options = table.storage.properties ++ pathOption,
         catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
 
-    dataSource match {
-      case fs: HadoopFsRelation =>
-        if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty)
{
-          throw new AnalysisException(
-            "Cannot create a file-based external data source table without path")
-        }
-      case _ =>
-    }
-
     val partitionColumnNames = if (table.schema.nonEmpty) {
       table.partitionColumnNames
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 8244b21..9136a83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.internal
 
-import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.annotation.Experimental
@@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
 
   /**
    * :: Experimental ::
-   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * Creates a table from the given path and returns the corresponding DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.
    *
    * @group ddl_ops
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
-  override def createExternalTable(tableName: String, path: String): DataFrame = {
+  override def createTable(tableName: String, path: String): DataFrame = {
     val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName
-    createExternalTable(tableName, path, dataSourceName)
+    createTable(tableName, path, dataSourceName)
   }
 
   /**
    * :: Experimental ::
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
+   * Creates a table from the given path based on a data source and returns the corresponding
+   * DataFrame.
    *
    * @group ddl_ops
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
-  override def createExternalTable(tableName: String, path: String, source: String): DataFrame
= {
-    createExternalTable(tableName, source, Map("path" -> path))
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates an external table from the given path based on a data source and a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  override def createExternalTable(
-      tableName: String,
-      source: String,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, options.asScala.toMap)
+  override def createTable(tableName: String, path: String, source: String): DataFrame =
{
+    createTable(tableName, source, Map("path" -> path))
   }
 
   /**
    * :: Experimental ::
    * (Scala-specific)
-   * Creates an external table from the given path based on a data source and a set of options.
+   * Creates a table from the given path based on a data source and a set of options.
    * Then, returns the corresponding DataFrame.
    *
    * @group ddl_ops
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
-  override def createExternalTable(
+  override def createTable(
       tableName: String,
       source: String,
       options: Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, new StructType, options)
-  }
-
-  /**
-   * :: Experimental ::
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 2.0.0
-   */
-  @Experimental
-  override def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    createExternalTable(tableName, source, schema, options.asScala.toMap)
+    createTable(tableName, source, new StructType, options)
   }
 
   /**
    * :: Experimental ::
    * (Scala-specific)
-   * Create an external table from the given path based on a data source, a schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Create a table from the given path based on a data source, a schema and a set of options.
+   * Then, returns the corresponding DataFrame.
    *
    * @group ddl_ops
-   * @since 2.0.0
+   * @since 2.2.0
    */
   @Experimental
-  override def createExternalTable(
+  override def createTable(
       tableName: String,
       source: String,
       schema: StructType,
       options: Map[String, String]): DataFrame = {
     val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val storage = DataSource.buildStorageFormatFromOptions(options)
+    val tableType = if (storage.locationUri.isDefined) {
+      CatalogTableType.EXTERNAL
+    } else {
+      CatalogTableType.MANAGED
+    }
     val tableDesc = CatalogTable(
       identifier = tableIdent,
-      tableType = CatalogTableType.EXTERNAL,
-      storage = DataSource.buildStorageFormatFromOptions(options),
+      tableType = tableType,
+      storage = storage,
       schema = schema,
       provider = Some(source)
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 5dd0454..801912f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.internal
 
+import java.io.File
+import java.net.URI
+
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
@@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -37,6 +40,7 @@ class CatalogSuite
   extends SparkFunSuite
   with BeforeAndAfterEach
   with SharedSQLContext {
+  import testImplicits._
 
   private def sessionCatalog: SessionCatalog = spark.sessionState.catalog
 
@@ -306,22 +310,6 @@ class CatalogSuite
     columnFields.foreach { f => assert(columnString.contains(f.toString)) }
   }
 
-  test("createExternalTable should fail if path is not given for file-based data source")
{
-    val e = intercept[AnalysisException] {
-      spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
-    }
-    assert(e.message.contains("Unable to infer schema"))
-
-    val e2 = intercept[AnalysisException] {
-      spark.catalog.createExternalTable(
-        "tbl",
-        "json",
-        new StructType().add("i", IntegerType),
-        Map.empty[String, String])
-    }
-    assert(e2.message == "Cannot create a file-based external data source table without path")
-  }
-
   test("dropTempView should not un-cache and drop metastore table if a same-name table exists")
{
     withTable("same_name") {
       spark.range(10).write.saveAsTable("same_name")
@@ -460,6 +448,50 @@ class CatalogSuite
     }
   }
 
+  test("createTable with 'path' in options") {
+    withTable("t") {
+      withTempDir { dir =>
+        spark.catalog.createTable(
+          tableName = "t",
+          source = "json",
+          schema = new StructType().add("i", "int"),
+          options = Map("path" -> dir.getAbsolutePath))
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table.tableType == CatalogTableType.EXTERNAL)
+        assert(table.storage.locationUri.get == dir.getAbsolutePath)
+
+        Seq((1)).toDF("i").write.insertInto("t")
+        assert(dir.exists() && dir.listFiles().nonEmpty)
+
+        sql("DROP TABLE t")
+        // the table path and data files are still there after DROP TABLE, if custom table
path is
+        // specified.
+        assert(dir.exists() && dir.listFiles().nonEmpty)
+      }
+    }
+  }
+
+  test("createTable without 'path' in options") {
+    withTable("t") {
+      spark.catalog.createTable(
+        tableName = "t",
+        source = "json",
+        schema = new StructType().add("i", "int"),
+        options = Map.empty[String, String])
+      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      assert(table.tableType == CatalogTableType.MANAGED)
+      val tablePath = new File(new URI(table.storage.locationUri.get))
+      assert(tablePath.exists() && tablePath.listFiles().isEmpty)
+
+      Seq((1)).toDF("i").write.insertInto("t")
+      assert(tablePath.listFiles().nonEmpty)
+
+      sql("DROP TABLE t")
+      // the table path is removed after DROP TABLE, if custom table path is not specified.
+      assert(!tablePath.exists())
+    }
+  }
+
   // TODO: add tests for the rest of them
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message