spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15711][SQL] Ban CREATE TEMPORARY TABLE USING AS SELECT
Date Thu, 02 Jun 2016 21:11:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9aff6f3b1 -> d109a1bee


[SPARK-15711][SQL] Ban CREATE TEMPORARY TABLE USING AS SELECT

## What changes were proposed in this pull request?

This PR bans syntax like `CREATE TEMPORARY TABLE USING AS SELECT`

`CREATE TEMPORARY TABLE ... USING ... AS ...` is not properly implemented, the temporary data
is not cleaned up when the session exits. Before a full fix, we probably should ban this syntax.

This PR only impact syntax like `CREATE TEMPORARY TABLE ... USING ... AS ...`.
Other syntax like `CREATE TEMPORARY TABLE .. USING ...` and `CREATE TABLE ... USING ...` are
not impacted.

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13451 from clockfly/ban_create_temp_table_using_as.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d109a1be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d109a1be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d109a1be

Branch: refs/heads/master
Commit: d109a1beeef5bca1e683247e0a5db4ec841bf3ba
Parents: 9aff6f3
Author: Sean Zhong <seanzhong@databricks.com>
Authored: Thu Jun 2 14:11:01 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Jun 2 14:11:01 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |   1 -
 .../spark/sql/execution/SparkSqlParser.scala    |   9 +-
 .../spark/sql/execution/SparkStrategies.scala   |  10 +-
 .../spark/sql/execution/datasources/ddl.scala   |  32 ---
 .../sql/sources/CreateTableAsSelectSuite.scala  | 265 ++++++++++---------
 .../sql/hive/execution/SQLQuerySuite.scala      |  46 ----
 6 files changed, 142 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 25678e9..50ae966 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -561,7 +561,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
           CreateTableUsingAsSelect(
             tableIdent,
             source,
-            temporary = false,
             partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
             getBucketSpec,
             mode,

http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 01409c6..8ffc556 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -317,17 +317,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       // Get the backing query.
       val query = plan(ctx.query)
 
+      if (temp) {
+        throw operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
+      }
+
       // Determine the storage mode.
       val mode = if (ifNotExists) {
         SaveMode.Ignore
-      } else if (temp) {
-        SaveMode.Overwrite
       } else {
         SaveMode.ErrorIfExists
       }
 
       CreateTableUsingAsSelect(
-        table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
+        table, provider, partitionColumnNames, bucketSpec, mode, options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
       CreateTableUsing(
@@ -960,7 +962,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
           CreateTableUsingAsSelect(
             tableIdent = tableDesc.identifier,
             provider = conf.defaultDataSourceName,
-            temporary = false,
             partitionColumns = tableDesc.partitionColumnNames.toArray,
             bucketSpec = None,
             mode = mode,

http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9610506..b20897e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -397,15 +397,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan]
{
         throw new AnalysisException(
           "allowExisting should be set to false when creating a temporary table.")
 
-      case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty
=>
-        sys.error("Cannot create temporary partitioned table.")
-
-      case c: CreateTableUsingAsSelect if c.temporary =>
-        val cmd = CreateTempTableUsingAsSelectCommand(
-          c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case c: CreateTableUsingAsSelect if !c.temporary =>
+      case c: CreateTableUsingAsSelect =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             c.tableIdent,

http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index edbccde..bf272e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -56,7 +56,6 @@ case class CreateTableUsing(
 case class CreateTableUsingAsSelect(
     tableIdent: TableIdentifier,
     provider: String,
-    temporary: Boolean,
     partitionColumns: Array[String],
     bucketSpec: Option[BucketSpec],
     mode: SaveMode,
@@ -91,37 +90,6 @@ case class CreateTempTableUsing(
   }
 }
 
-case class CreateTempTableUsingAsSelectCommand(
-    tableIdent: TableIdentifier,
-    provider: String,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends RunnableCommand {
-
-  if (tableIdent.database.isDefined) {
-    throw new AnalysisException(
-      s"Temporary table '$tableIdent' should not have specified a database")
-  }
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val df = Dataset.ofRows(sparkSession, query)
-    val dataSource = DataSource(
-      sparkSession,
-      className = provider,
-      partitionColumns = partitionColumns,
-      bucketSpec = None,
-      options = options)
-    val result = dataSource.write(mode, df)
-    sparkSession.sessionState.catalog.createTempView(
-      tableIdent.table,
-      Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
-      overrideIfExists = true)
-
-    Seq.empty[Row]
-  }
-}
-
 case class RefreshTable(tableIdent: TableIdentifier)
   extends RunnableCommand {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index cbddb06..f9a07db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.{File, IOException}
+import java.io.File
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter
{
+
   protected override lazy val sql = spark.sql _
   private var path: File = null
 
@@ -40,172 +44,175 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext
with
   override def afterAll(): Unit = {
     try {
       spark.catalog.dropTempView("jt")
+      if (path.exists()) {
+        Utils.deleteRecursively(path)
+      }
     } finally {
       super.afterAll()
     }
   }
 
-  after {
-    Utils.deleteRecursively(path)
+  before {
+    if (path.exists()) {
+      Utils.deleteRecursively(path)
+    }
   }
 
-  test("CREATE TEMPORARY TABLE AS SELECT") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT a, b FROM jsonTable"),
-      sql("SELECT a, b FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
+  test("CREATE TABLE USING AS SELECT") {
+    withTable("jsonTable") {
+      sql(
+        s"""
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a, b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+    }
   }
 
-  test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") {
+  test("CREATE TABLE USING AS SELECT based on the file without write permission") {
     val childPath = new File(path.toString, "child")
     path.mkdir()
-    childPath.createNewFile()
     path.setWritable(false)
 
-    val e = intercept[IOException] {
+    val e = intercept[SparkException] {
       sql(
         s"""
-           |CREATE TEMPORARY TABLE jsonTable
+           |CREATE TABLE jsonTable
            |USING json
            |OPTIONS (
-           |  path '${path.toString}'
+           |  path '${childPath.toString}'
            |) AS
            |SELECT a, b FROM jt
-        """.stripMargin)
+         """.stripMargin)
       sql("SELECT a, b FROM jsonTable").collect()
     }
-    assert(e.getMessage().contains("Unable to clear output directory"))
 
+    assert(e.getMessage().contains("Job aborted"))
     path.setWritable(true)
   }
 
   test("create a table, drop it and create another one with the same name") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT a, b FROM jsonTable"),
-      sql("SELECT a, b FROM jt").collect())
-
-    val message = intercept[ParseException]{
+    withTable("jsonTable") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a * 4 FROM jt
-      """.stripMargin)
-    }.getMessage
-    assert(message.toLowerCase.contains("operation not allowed"))
-
-    // Overwrite the temporary table.
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a * 4 FROM jt
-      """.stripMargin)
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT a * 4 FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
-    // Explicitly delete the data.
-    if (path.exists()) Utils.deleteRecursively(path)
-
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT b FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
-  }
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a, b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+
+      // Creates a table of the same name with flag "if not exists", nothing happens
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a * 4 FROM jt
+         """.stripMargin)
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+
+      // Explicitly drops the table and deletes the underlying data.
+      sql("DROP TABLE jsonTable")
+      if (path.exists()) Utils.deleteRecursively(path)
 
-  test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
-    val message = intercept[ParseException]{
+      // Creates a table of the same name again, this time we succeed.
       sql(
         s"""
-        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT b FROM jt
-      """.stripMargin)
-    }.getMessage
-    assert(message.toLowerCase.contains("operation not allowed"))
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        sql("SELECT b FROM jt"))
+    }
+  }
+
+  test("disallows CREATE TEMPORARY TABLE ... USING ... AS query") {
+    withTable("t") {
+      val error = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE TEMPORARY TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |PARTITIONED BY (a)
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+      assert(error.contains("Operation not allowed") &&
+        error.contains("CREATE TEMPORARY TABLE ... USING ... AS query"))
+    }
   }
 
-  test("a CTAS statement with column definitions is not allowed") {
-    intercept[AnalysisException]{
+  test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") {
+    withTable("t") {
+      val error = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+
+      assert(error.contains("Operation not allowed") &&
+        error.contains("CREATE EXTERNAL TABLE ... USING"))
+    }
+  }
+
+  test("create table using as select - with partitioned by") {
+    val catalog = spark.sessionState.catalog
+    withTable("t") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE jsonTable (a int, b string)
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
+           |CREATE TABLE t USING PARQUET
+           |OPTIONS (PATH '${path.toString}')
+           |PARTITIONED BY (a)
+           |AS SELECT 1 AS a, 2 AS b
+         """.stripMargin
+      )
+      val table = catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a"))
     }
   }
 
-  test("it is not allowed to write to a table while querying it.") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    val message = intercept[AnalysisException] {
+  test("create table using as select - with bucket") {
+    val catalog = spark.sessionState.catalog
+    withTable("t") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jsonTable
-      """.stripMargin)
-    }.getMessage
-    assert(
-      message.contains("Cannot overwrite table "),
-      "Writing to a table while querying it should not be allowed.")
+           |CREATE TABLE t USING PARQUET
+           |OPTIONS (PATH '${path.toString}')
+           |CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS
+           |AS SELECT 1 AS a, 2 AS b
+         """.stripMargin
+      )
+      val table = catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
+        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d109a1be/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 24de223..499819f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1506,52 +1506,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
     }
   }
 
-  test(
-    "SPARK-14488 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
-    "shouldn't create persisted table"
-  ) {
-    withTempPath { dir =>
-      withTempTable("t1", "t2") {
-        val path = dir.getCanonicalPath
-        val ds = spark.range(10)
-        ds.createOrReplaceTempView("t1")
-
-        sql(
-          s"""CREATE TEMPORARY TABLE t2
-             |USING PARQUET
-             |OPTIONS (PATH '$path')
-             |AS SELECT * FROM t1
-           """.stripMargin)
-
-        checkAnswer(
-          spark.sql("SHOW TABLES").select('isTemporary).filter('tableName === "t2"),
-          Row(true)
-        )
-
-        checkAnswer(table("t2"), table("t1"))
-      }
-    }
-  }
-
-  test(
-    "SPARK-14493 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
-    "shouldn always be used together with PATH data source option"
-  ) {
-    withTempTable("t") {
-      spark.range(10).createOrReplaceTempView("t")
-
-      val message = intercept[IllegalArgumentException] {
-        sql(
-          s"""CREATE TEMPORARY TABLE t1
-             |USING PARQUET
-             |AS SELECT * FROM t
-           """.stripMargin)
-      }.getMessage
-
-      assert(message == "'path' is not specified")
-    }
-  }
-
   test("derived from Hive query file: drop_database_removes_partition_dirs.q") {
     // This test verifies that if a partition exists outside a table's current location when
the
     // database is dropped the partition's location is dropped as well.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message