spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-15646][SQL] When spark.sql.hive.convertCTAS is true, the conversion rule needs to respect TEXTFILE/SEQUENCEFILE format and the user-defined location
Date Thu, 02 Jun 2016 00:55:57 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 35195f6ce -> 5a835b99f


[SPARK-15646][SQL] When spark.sql.hive.convertCTAS is true, the conversion rule needs to respect
TEXTFILE/SEQUENCEFILE format and the user-defined location

## What changes were proposed in this pull request?
When `spark.sql.hive.convertCTAS` is true, for a CTAS statement, we will create a data source
table using the default source (i.e. parquet) if the CTAS does not specify any Hive storage
format. However, there are two issues with this conversion logic.
1. First, we determine if a CTAS statement defines storage format by checking the serde. However,
TEXTFILE/SEQUENCEFILE does not have a default serde. When we do the check, we have not set
the default serde. So, a query like `CREATE TABLE abc STORED AS TEXTFILE AS SELECT ...` actually
creates a data source parquet table.
2. In the conversion logic, we are ignoring the user-specified location.

This PR fixes the above two issues.

Also, this PR makes the parser throws an exception when a CTAS statement has a PARTITIONED
BY clause. This change is made because Hive's syntax does not allow it and our current implementation
actually does not work for this case (the insert operation always throws an exception because
the insertion does not pick up the partitioning info).

## How was this patch tested?
I am adding new tests in SQLQuerySuite and HiveDDLCommandSuite.

Author: Yin Huai <yhuai@databricks.com>

Closes #13386 from yhuai/SPARK-14507.

(cherry picked from commit 6dddb70c387ed1f002d2602b2b1f919ef021de91)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a835b99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a835b99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a835b99

Branch: refs/heads/branch-2.0
Commit: 5a835b99f9852b0c2a35f9c75a51d493474994ea
Parents: 35195f6
Author: Yin Huai <yhuai@databricks.com>
Authored: Wed Jun 1 17:55:37 2016 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Jun 1 17:55:49 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkSqlParser.scala    |  37 ++++-
 .../spark/sql/execution/command/tables.scala    |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 ++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  57 ++------
 .../spark/sql/hive/HiveSessionState.scala       |  16 ---
 .../org/apache/spark/sql/hive/HiveUtils.scala   |   6 -
 .../CreateHiveTableAsSelectCommand.scala        | 102 ++++++++++++++
 .../execution/CreateTableAsSelectCommand.scala  | 101 --------------
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  25 ++--
 .../sql/hive/execution/HiveExplainSuite.scala   |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 135 ++++++++++++++-----
 11 files changed, 273 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6c19bf0..01409c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -839,7 +839,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
   /**
    * Create a table, returning either a [[CreateTableCommand]] or a
-   * [[CreateTableAsSelectLogicalPlan]].
+   * [[CreateHiveTableAsSelectLogicalPlan]].
    *
    * This is not used to create datasource tables, which is handled through
    * "CREATE TABLE ... USING ...".
@@ -936,7 +936,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       comment = comment)
 
     selectQuery match {
-      case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+      case Some(q) =>
+        // Hive does not allow to use a CTAS statement to create a partitioned table.
+        if (tableDesc.partitionColumnNames.nonEmpty) {
+          val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to
" +
+            "create a partitioned table using Hive's file formats. " +
+            "Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
+            "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a "
+
+            "CTAS statement."
+          throw operationNotAllowed(errorMessage, ctx)
+        }
+
+        val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
+        if (conf.convertCTAS && !hasStorageProperties) {
+          val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+          // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
+          // are empty Maps.
+          val optionsWithPath = if (location.isDefined) {
+            Map("path" -> location.get)
+          } else {
+            Map.empty[String, String]
+          }
+          CreateTableUsingAsSelect(
+            tableIdent = tableDesc.identifier,
+            provider = conf.defaultDataSourceName,
+            temporary = false,
+            partitionColumns = tableDesc.partitionColumnNames.toArray,
+            bucketSpec = None,
+            mode = mode,
+            options = optionsWithPath,
+            q
+          )
+        } else {
+          CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+        }
       case None => CreateTableCommand(tableDesc, ifNotExists)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1b89c6b..90db785 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-case class CreateTableAsSelectLogicalPlan(
+case class CreateHiveTableAsSelectLogicalPlan(
     tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean) extends UnaryNode with Command {

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d1db0dd..437e093 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
     .stringConf
     .createWithDefault("parquet")
 
+  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+    .internal()
+    .doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
+      "without specifying any storage property will be converted to a data source table,
" +
+      "using the data source set by spark.sql.sources.default.")
+    .booleanConf
+    .createWithDefault(false)
+
   // This is used to control the when we will split a schema's JSON string to multiple pieces
   // in order to fit the JSON string in metastore's table property (by default, the value
has
   // a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
 
   def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
 
+  def convertCTAS: Boolean = getConf(CONVERT_CTAS)
+
   def partitionDiscoveryEnabled(): Boolean =
     getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ff395f3..f10afa7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
+import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -446,53 +446,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession)
extends Log
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
-        val schema = if (table.schema.nonEmpty) {
-          table.schema
+      case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
+        val desc = if (table.storage.serde.isEmpty) {
+          // add default serde
+          table.withNewStorage(
+            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
         } else {
-          child.output.map { a =>
-            CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
-          }
+          table
         }
 
-        val desc = table.copy(schema = schema)
-
-        if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
-          // Do the conversion when spark.sql.hive.convertCTAS is true and the query
-          // does not specify any storage format (file format and storage handler).
-          if (table.identifier.database.isDefined) {
-            throw new AnalysisException(
-              "Cannot specify database name in a CTAS statement " +
-                "when spark.sql.hive.convertCTAS is set to true.")
-          }
+        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
-          val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
-          CreateTableUsingAsSelect(
-            TableIdentifier(desc.identifier.table),
-            sessionState.conf.defaultDataSourceName,
-            temporary = false,
-            Array.empty[String],
-            bucketSpec = None,
-            mode,
-            options = Map.empty[String, String],
-            child
-          )
-        } else {
-          val desc = if (table.storage.serde.isEmpty) {
-            // add default serde
-            table.withNewStorage(
-              serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-          } else {
-            table
-          }
-
-          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
-
-          execution.CreateTableAsSelectCommand(
-            desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-            child,
-            allowExisting)
-        }
+        execution.CreateHiveTableAsSelectCommand(
+          desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+          child,
+          allowExisting)
     }
   }
 
@@ -543,6 +511,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
 /**
  * An override of the standard HDFS listing based catalog, that overrides the partition spec
with
  * the information from the metastore.
+ *
  * @param tableBasePath The default base path of the Hive metastore table
  * @param partitionSpec The partition specifications from Hive metastore
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 081d85a..ca8e5f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -139,22 +139,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
   }
 
   /**
-   * When true, a table created by a Hive CTAS statement (no USING clause) will be
-   * converted to a data source table, using the data source set by spark.sql.sources.default.
-   * The table in CTAS statement will be converted when it meets any of the following conditions:
-   *   - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED
AS), or
-   *     a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml
-   *     is either TextFile or SequenceFile.
-   *   - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and
no SerDe
-   *     is specified (no ROW FORMAT SERDE clause).
-   *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
-   *     and no SerDe is specified (no ROW FORMAT SERDE clause).
-   */
-  def convertCTAS: Boolean = {
-    conf.getConf(HiveUtils.CONVERT_CTAS)
-  }
-
-  /**
    * When true, Hive Thrift server will execute SQL queries asynchronously using a thread
pool."
    */
   def hiveThriftServerAsync: Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 88f4a2d..9ed357c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
-    .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be "
+
-      "converted to a data source table, using the data source set by spark.sql.sources.default.")
-    .booleanConf
-    .createWithDefault(false)
-
   val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
     .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of
" +
       "the built in support.")

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
new file mode 100644
index 0000000..b809938
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hive.MetastoreRelation
+
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the Table Describe, which may contains serde, storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param ignoreIfExists allow continue working if it's already exists, otherwise
+ *                      raise exception
+ */
+private[hive]
+case class CreateHiveTableAsSelectCommand(
+    tableDesc: CatalogTable,
+    query: LogicalPlan,
+    ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  private val tableIdentifier = tableDesc.identifier
+
+  override def children: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    lazy val metastoreRelation: MetastoreRelation = {
+      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+      import org.apache.hadoop.io.Text
+      import org.apache.hadoop.mapred.TextInputFormat
+
+      val withFormat =
+        tableDesc.withNewStorage(
+          inputFormat =
+            tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+          outputFormat =
+            tableDesc.storage.outputFormat
+              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
+          serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
+          compressed = tableDesc.storage.compressed)
+
+      val withSchema = if (withFormat.schema.isEmpty) {
+        // Hive doesn't support specifying the column list for target table in CTAS
+        // However we don't think SparkSQL should follow that.
+        tableDesc.copy(schema = query.output.map { c =>
+          CatalogColumn(c.name, c.dataType.catalogString)
+        })
+      } else {
+        withFormat
+      }
+
+      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
+
+      // Get the Metastore Relation
+      sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+        case r: MetastoreRelation => r
+      }
+    }
+    // TODO ideally, we should get the output data ready first and then
+    // add the relation into catalog, just in case of failure occurs while data
+    // processing.
+    if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
+      if (ignoreIfExists) {
+        // table already exists, will do nothing, to keep consistent with Hive
+      } else {
+        throw new AnalysisException(s"$tableIdentifier already exists.")
+      }
+    } else {
+      sparkSession.sessionState.executePlan(InsertIntoTable(
+        metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+    }
+
+    Seq.empty[Row]
+  }
+
+  override def argString: String = {
+    s"[Database:${tableDesc.database}}, " +
+    s"TableName: ${tableDesc.identifier.table}, " +
+    s"InsertIntoHiveTable]"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
deleted file mode 100644
index cfe6149..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.MetastoreRelation
-
-
-/**
- * Create table and insert the query result into it.
- * @param tableDesc the Table Describe, which may contains serde, storage handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param allowExisting allow continue working if it's already exists, otherwise
- *                      raise exception
- */
-private[hive]
-case class CreateTableAsSelectCommand(
-    tableDesc: CatalogTable,
-    query: LogicalPlan,
-    allowExisting: Boolean)
-  extends RunnableCommand {
-
-  private val tableIdentifier = tableDesc.identifier
-
-  override def children: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    lazy val metastoreRelation: MetastoreRelation = {
-      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-      import org.apache.hadoop.io.Text
-      import org.apache.hadoop.mapred.TextInputFormat
-
-      val withFormat =
-        tableDesc.withNewStorage(
-          inputFormat =
-            tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
-          outputFormat =
-            tableDesc.storage.outputFormat
-              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
-          serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
-          compressed = tableDesc.storage.compressed)
-
-      val withSchema = if (withFormat.schema.isEmpty) {
-        // Hive doesn't support specifying the column list for target table in CTAS
-        // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema = query.output.map { c =>
-          CatalogColumn(c.name, c.dataType.catalogString)
-        })
-      } else {
-        withFormat
-      }
-
-      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
-
-      // Get the Metastore Relation
-      sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case r: MetastoreRelation => r
-      }
-    }
-    // TODO ideally, we should get the output data ready first and then
-    // add the relation into catalog, just in case of failure occurs while data
-    // processing.
-    if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
-      if (allowExisting) {
-        // table already exists, will do nothing, to keep consistent with Hive
-      } else {
-        throw new AnalysisException(s"$tableIdentifier already exists.")
-      }
-    } else {
-      sparkSession.sessionState.executePlan(InsertIntoTable(
-        metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
-    }
-
-    Seq.empty[Row]
-  }
-
-  override def argString: String = {
-    s"[Database:${tableDesc.database}}, " +
-    s"TableName: ${tableDesc.identifier.table}, " +
-    s"InsertIntoHiveTable]"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 3297a6f..ba9fe54 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLCommandSuite extends PlanTest {
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
       case c: CreateTableCommand => (c.table, c.ifNotExists)
-      case c: CreateTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+      case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
       case c: CreateViewCommand => (c.tableDesc, c.allowExisting)
     }.head
   }
@@ -58,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest {
         |ip STRING COMMENT 'IP Address of the User',
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
-        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the
day')
         |STORED AS RCFILE
         |LOCATION '/user/external/page_view'
         |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
@@ -76,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) ::
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     assert(desc.comment == Some("This is the staging page view table"))
     // TODO will be SQLText
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns ==
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
     assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
     assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==
@@ -103,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest {
         |ip STRING COMMENT 'IP Address of the User',
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
-        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the
day')
         |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
         | STORED AS
         | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
@@ -124,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) ::
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.comment == Some("This is the staging page view table"))
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns ==
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
     assert(desc.storage.serdeProperties == Map())
     assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
     assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -195,6 +185,11 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
   }
 
+  test("CTAS statement with a PARTITIONED BY clause is not allowed") {
+    assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" +
+      " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
+  }
+
   test("unsupported operations") {
     intercept[ParseException] {
       parser.parsePlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 0d08f7e..a43eed9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -53,7 +53,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       "== Analyzed Logical Plan ==",
       "== Optimized Logical Plan ==",
       "== Physical Plan ==",
-      "CreateTableAsSelect",
+      "CreateHiveTableAsSelect",
       "InsertIntoHiveTable",
       "Limit",
       "src")
@@ -71,7 +71,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       "== Analyzed Logical Plan ==",
       "== Optimized Logical Plan ==",
       "== Physical Plan ==",
-      "CreateTableAsSelect",
+      "CreateHiveTableAsSelect",
       "InsertIntoHiveTable",
       "Limit",
       "src")
@@ -92,7 +92,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       val shouldContain =
         "== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
         "== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
-        "CreateTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
+        "CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
       for (key <- shouldContain) {
         assert(outputs.contains(key), s"$key doesn't exist in result")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a835b99/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2a9b06b..b569145 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -376,78 +379,138 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
     )
   }
 
-  test("CTAS without serde") {
-    def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
-      val relation = EliminateSubqueryAliases(
-        sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
-      relation match {
-        case LogicalRelation(r: HadoopFsRelation, _, _) =>
-          if (!isDataSourceParquet) {
-            fail(
-              s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
+  def checkRelation(
+      tableName: String,
+      isDataSourceParquet: Boolean,
+      format: String,
+      userSpecifiedLocation: Option[String] = None): Unit = {
+    val relation = EliminateSubqueryAliases(
+      sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+    val catalogTable =
+      sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+    relation match {
+      case LogicalRelation(r: HadoopFsRelation, _, _) =>
+        if (!isDataSourceParquet) {
+          fail(
+            s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
-          }
+        }
+        userSpecifiedLocation match {
+          case Some(location) =>
+            assert(r.options("path") === location)
+          case None => // OK.
+        }
+        assert(
+          catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === format)
 
-        case r: MetastoreRelation =>
-          if (isDataSourceParquet) {
-            fail(
-              s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
+      case r: MetastoreRelation =>
+        if (isDataSourceParquet) {
+          fail(
+            s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
-          }
-      }
+        }
+        userSpecifiedLocation match {
+          case Some(location) =>
+            assert(r.catalogTable.storage.locationUri.get === location)
+          case None => // OK.
+        }
+        // Also make sure that the format is the desired format.
+        assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
     }
 
-    val originalConf = sessionState.convertCTAS
+    // When a user-specified location is defined, the table type needs to be EXTERNAL.
+    val actualTableType = catalogTable.tableType
+    userSpecifiedLocation match {
+      case Some(location) =>
+        assert(actualTableType === CatalogTableType.EXTERNAL)
+      case None =>
+        assert(actualTableType === CatalogTableType.MANAGED)
+    }
+  }
 
-    setConf(HiveUtils.CONVERT_CTAS, true)
+  test("CTAS without serde without location") {
+    val originalConf = sessionState.conf.convertCTAS
 
+    setConf(SQLConf.CONVERT_CTAS, true)
+
+    val defaultDataSource = sessionState.conf.defaultDataSourceName
     try {
       sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
       sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
-      var message = intercept[AnalysisException] {
+      val message = intercept[AnalysisException] {
         sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
       }.getMessage
       assert(message.contains("already exists"))
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", true, defaultDataSource)
       sql("DROP TABLE ctas1")
 
       // Specifying database name for query can be converted to data source write path
       // is not allowed right now.
-      message = intercept[AnalysisException] {
-        sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
-      }.getMessage
-      assert(
-        message.contains("Cannot specify database name in a CTAS statement"),
-        "When spark.sql.hive.convertCTAS is true, we should not allow " +
-            "database name specified.")
+      sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+      checkRelation("ctas1", true, defaultDataSource)
+      sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as textfile" +
           " AS SELECT key k, value FROM src ORDER BY k, value")
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", false, "text")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as sequencefile" +
             " AS SELECT key k, value FROM src ORDER BY k, value")
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", false, "sequence")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src ORDER BY k,
value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "rcfile")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "orc")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY
k, value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "parquet")
       sql("DROP TABLE ctas1")
     } finally {
-      setConf(HiveUtils.CONVERT_CTAS, originalConf)
+      setConf(SQLConf.CONVERT_CTAS, originalConf)
       sql("DROP TABLE IF EXISTS ctas1")
     }
   }
 
+  test("CTAS without serde with location") {
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
+      withTempDir { dir =>
+        val defaultDataSource = sessionState.conf.defaultDataSourceName
+
+        val tempLocation = dir.getCanonicalPath
+        sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c2'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c2"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as textfile LOCATION 'file:$tempLocation/c3'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "text", Some(s"file:$tempLocation/c3"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as sequenceFile LOCATION 'file:$tempLocation/c4'"
+
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "sequence", Some(s"file:$tempLocation/c4"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as rcfile LOCATION 'file:$tempLocation/c5'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "rcfile", Some(s"file:$tempLocation/c5"))
+        sql("DROP TABLE ctas1")
+      }
+    }
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
     sql(
@@ -785,8 +848,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
     read.json(rdd).createOrReplaceTempView("data")
-    val originalConf = sessionState.convertCTAS
-    setConf(HiveUtils.CONVERT_CTAS, false)
+    val originalConf = sessionState.conf.convertCTAS
+    setConf(SQLConf.CONVERT_CTAS, false)
 
     try {
       sql("CREATE TABLE explodeTest (key bigInt)")
@@ -805,7 +868,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
       sql("DROP TABLE explodeTest")
       dropTempTable("data")
     } finally {
-      setConf(HiveUtils.CONVERT_CTAS, originalConf)
+      setConf(SQLConf.CONVERT_CTAS, originalConf)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message