spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-17353][SPARK-16943][SPARK-16942][BACKPORT-2.0][SQL] Fix multiple bugs in CREATE TABLE LIKE command
Date Tue, 06 Sep 2016 02:46:06 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e387c8ba8 -> f92d87455


[SPARK-17353][SPARK-16943][SPARK-16942][BACKPORT-2.0][SQL] Fix multiple bugs in CREATE TABLE
LIKE command

### What changes were proposed in this pull request?
This PR is to backport https://github.com/apache/spark/pull/14531.

The existing `CREATE TABLE LIKE` command has multiple issues:

- The generated table is non-empty when the source table is a data source table. The major
reason is the data source table is using the table property `path` to store the location of
table contents. Currently, we keep it unchanged. Thus, we still create the same table with
the same location.

- The table type of the generated table is `EXTERNAL` when the source table is an external
Hive Serde table. Currently, we explicitly set it to `MANAGED`, but Hive is checking the table
property `EXTERNAL` to decide whether the table is `EXTERNAL` or not. (See https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1407-L1408)
Thus, the created table is still `EXTERNAL`.

- When the source table is a `VIEW`, the metadata of the generated table contains the original
view text and view original text. So far, this does not break anything, but it could cause
something wrong in Hive. (For example, https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1405-L1406)

- The issue regarding the table `comment`. To follow what Hive does, the table comment should
be cleaned, but the column comments should be still kept.

- The `INDEX` table is not supported. Thus, we should throw an exception in this case.

- `owner` should not be retained. `ToHiveTable` set it [here](https://github.com/apache/spark/blob/e679bc3c1cd418ef0025d2ecbc547c9660cac433/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L793)
no matter which value we set in `CatalogTable`. We set it to an empty string for avoiding
the confusing output in Explain.

- Add a support for temp tables

- Like Hive, we should not copy the table properties from the source table to the created
table, especially for the statistics-related properties, which could be wrong in the created
table.

- `unsupportedFeatures` should not be copied from the source table. The created table does
not have these unsupported features.

- When the type of source table is a view, the target table is using the default format of
data source tables: `spark.sql.sources.default`.

This PR is to fix the above issues.

### How was this patch tested?
Improve the test coverage by adding more test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14946 from gatorsmile/createTableLike20.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f92d8745
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f92d8745
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f92d8745

Branch: refs/heads/branch-2.0
Commit: f92d87455214005e60b2d58aa814aaabd2ac9495
Parents: e387c8b
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Tue Sep 6 10:45:54 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Sep 6 10:45:54 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |   3 +-
 .../spark/sql/execution/command/tables.scala    |  65 +++++-
 .../spark/sql/hive/client/HiveClientImpl.scala  |   4 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 232 ++++++++++++++++++-
 4 files changed, 289 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f92d8745/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8db817c..2448513 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -263,8 +263,7 @@ class SessionCatalog(
           CatalogColumn(
             name = c.name,
             dataType = c.dataType.catalogString,
-            nullable = c.nullable,
-            comment = Option(c.name)
+            nullable = c.nullable
           )
         },
         properties = Map(),

http://git-wip-us.apache.org/repos/asf/spark/blob/f92d8745/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 7e6a352..615d9dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable,
Catal
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -56,7 +58,12 @@ case class CreateHiveTableAsSelectLogicalPlan(
 }
 
 /**
- * A command to create a table with the same definition of the given existing table.
+ * A command to create a MANAGED table with the same definition of the given existing table.
+ * In the target table definition, the table comment is always empty but the column comments
+ * are identical to the ones defined in the source table.
+ *
+ * The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
+ * serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
  *
  * The syntax of using this command in SQL is:
  * {{{
@@ -75,18 +82,54 @@ case class CreateTableLikeCommand(
       throw new AnalysisException(
         s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
     }
-    if (catalog.isTemporaryTable(sourceTable)) {
-      throw new AnalysisException(
-        s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'")
+    val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+
+    if (DDLUtils.isDatasourceTable(sourceTableDesc) ||
+        sourceTableDesc.tableType == CatalogTableType.VIEW) {
+      val outputSchema =
+        StructType(sourceTableDesc.schema.map { c =>
+          val builder = new MetadataBuilder
+          c.comment.map(comment => builder.putString("comment", comment))
+          StructField(
+            c.name,
+            CatalystSqlParser.parseDataType(c.dataType),
+            c.nullable,
+            metadata = builder.build())
+        })
+      val (schema, provider) = if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
+        (DDLUtils.getSchemaFromTableProperties(sourceTableDesc).getOrElse(outputSchema),
+          sourceTableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER))
+      } else { // VIEW
+        (outputSchema, sparkSession.sessionState.conf.defaultDataSourceName)
+      }
+      createDataSourceTable(
+        sparkSession = sparkSession,
+        tableIdent = targetTable,
+        userSpecifiedSchema = Some(schema),
+        partitionColumns = Array.empty[String],
+        bucketSpec = None,
+        provider = provider,
+        options = Map("path" -> catalog.defaultTablePath(targetTable)),
+        isExternal = false)
+    } else {
+      val newStorage =
+        sourceTableDesc.storage.copy(
+          locationUri = None,
+          serdeProperties = sourceTableDesc.storage.serdeProperties)
+      val newTableDesc =
+        CatalogTable(
+          identifier = targetTable,
+          tableType = CatalogTableType.MANAGED,
+          storage = newStorage,
+          schema = sourceTableDesc.schema,
+          partitionColumnNames = sourceTableDesc.partitionColumnNames,
+          sortColumnNames = sourceTableDesc.sortColumnNames,
+          bucketColumnNames = sourceTableDesc.bucketColumnNames,
+          numBuckets = sourceTableDesc.numBuckets)
+
+      catalog.createTable(newTableDesc, ifNotExists)
     }
 
-    val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
-      identifier = targetTable,
-      tableType = CatalogTableType.MANAGED,
-      createTime = System.currentTimeMillis,
-      lastAccessTime = -1).withNewStorage(locationUri = None)
-
-    catalog.createTable(tableToCreate, ifNotExists)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f92d8745/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index ac78676..7db51d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -412,7 +412,9 @@ private[hive] class HiveClientImpl(
           serdeProperties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
             .map(_.asScala.toMap).orNull
         ),
-        properties = properties.filter(kv => kv._1 != "comment"),
+        // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This
is added
+        // in the function toHiveTable.
+        properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
         comment = properties.get("comment"),
         viewOriginalText = Option(h.getViewOriginalText),
         viewText = Option(h.getViewExpandedText),

http://git-wip-us.apache.org/repos/asf/spark/blob/f92d8745/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 3cf3c6a..676c08b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -24,8 +24,11 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -651,6 +654,233 @@ class HiveDDLSuite
     }
   }
 
+
+  test("CREATE TABLE LIKE a temporary view") {
+    val sourceViewName = "tab1"
+    val targetTabName = "tab2"
+    withTempView(sourceViewName) {
+      withTable(targetTabName) {
+        spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+          .createTempView(sourceViewName)
+        sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+        val sourceTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(sourceViewName, None))
+        val targetTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(targetTabName, Some("default")))
+
+        checkCreateTableLike(sourceTable, targetTable)
+      }
+    }
+  }
+
+  test("CREATE TABLE LIKE a data source table") {
+    val sourceTabName = "tab1"
+    val targetTabName = "tab2"
+    withTable(sourceTabName, targetTabName) {
+      spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+        .write.format("json").saveAsTable(sourceTabName)
+      sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+      val sourceTable =
+        spark.sessionState.catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+      val targetTable =
+        spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+      // The table type of the source table should be a Hive-managed data source table
+      assert(DDLUtils.isDatasourceTable(sourceTable))
+      assert(sourceTable.tableType == CatalogTableType.MANAGED)
+
+      checkCreateTableLike(sourceTable, targetTable)
+    }
+  }
+
+  test("CREATE TABLE LIKE an external data source table") {
+    val sourceTabName = "tab1"
+    val targetTabName = "tab2"
+    withTable(sourceTabName, targetTabName) {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+          .write.format("parquet").save(path)
+        sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
+        sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+        // The source table should be an external data source table
+        val sourceTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(sourceTabName, Some("default")))
+        val targetTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(targetTabName, Some("default")))
+        // The table type of the source table should be an external data source table
+        assert(DDLUtils.isDatasourceTable(sourceTable))
+        assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+
+        checkCreateTableLike(sourceTable, targetTable)
+      }
+    }
+  }
+
+  test("CREATE TABLE LIKE a managed Hive serde table") {
+    val catalog = spark.sessionState.catalog
+    val sourceTabName = "tab1"
+    val targetTabName = "tab2"
+    withTable(sourceTabName, targetTabName) {
+      sql(s"CREATE TABLE $sourceTabName TBLPROPERTIES('prop1'='value1') AS SELECT 1 key,
'a'")
+      sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+      val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+      assert(sourceTable.tableType == CatalogTableType.MANAGED)
+      assert(sourceTable.properties.get("prop1").nonEmpty)
+      val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+      checkCreateTableLike(sourceTable, targetTable)
+    }
+  }
+
+  test("CREATE TABLE LIKE an external Hive serde table") {
+    val catalog = spark.sessionState.catalog
+    withTempDir { tmpDir =>
+      val basePath = tmpDir.getCanonicalPath
+      val sourceTabName = "tab1"
+      val targetTabName = "tab2"
+      withTable(sourceTabName, targetTabName) {
+        assert(tmpDir.listFiles.isEmpty)
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE $sourceTabName (key INT comment 'test', value STRING)
+             |COMMENT 'Apache Spark'
+             |PARTITIONED BY (ds STRING, hr STRING)
+             |LOCATION '$basePath'
+           """.stripMargin)
+        for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+          sql(
+            s"""
+               |INSERT OVERWRITE TABLE $sourceTabName
+               |partition (ds='$ds',hr='$hr')
+               |SELECT 1, 'a'
+             """.stripMargin)
+        }
+        sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+        val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+        assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+        assert(sourceTable.comment == Option("Apache Spark"))
+        val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+        checkCreateTableLike(sourceTable, targetTable)
+      }
+    }
+  }
+
+  test("CREATE TABLE LIKE a view") {
+    val sourceTabName = "tab1"
+    val sourceViewName = "view"
+    val targetTabName = "tab2"
+    withTable(sourceTabName, targetTabName) {
+      withView(sourceViewName) {
+        spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+          .write.format("json").saveAsTable(sourceTabName)
+        sql(s"CREATE VIEW $sourceViewName AS SELECT * FROM $sourceTabName")
+        sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+        val sourceView = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(sourceViewName, Some("default")))
+        // The original source should be a VIEW with an empty path
+        assert(sourceView.tableType == CatalogTableType.VIEW)
+        assert(sourceView.viewText.nonEmpty && sourceView.viewOriginalText.nonEmpty)
+        val targetTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier(targetTabName, Some("default")))
+
+        checkCreateTableLike(sourceView, targetTable)
+      }
+    }
+  }
+
+  private def getTablePath(table: CatalogTable): Option[String] = {
+    if (DDLUtils.isDatasourceTable(table)) {
+      new CaseInsensitiveMap(table.storage.serdeProperties).get("path")
+    } else {
+      table.storage.locationUri
+    }
+  }
+
+  private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable):
Unit = {
+    // The created table should be a MANAGED table with empty view text and original text.
+    assert(targetTable.tableType == CatalogTableType.MANAGED,
+      "the created table must be a Hive managed table")
+    assert(targetTable.viewText.isEmpty && targetTable.viewOriginalText.isEmpty,
+      "the view text and original text in the created table must be empty")
+    assert(targetTable.comment.isEmpty,
+      "the comment in the created table must be empty")
+    assert(targetTable.unsupportedFeatures.isEmpty,
+      "the unsupportedFeatures in the create table must be empty")
+
+    val metastoreGeneratedProperties = Seq(
+      "CreateTime",
+      "transient_lastDdlTime",
+      "grantTime",
+      "lastUpdateTime",
+      "last_modified_by",
+      "last_modified_time",
+      "Owner:",
+      "COLUMN_STATS_ACCURATE",
+      "numFiles",
+      "numRows",
+      "rawDataSize",
+      "totalSize",
+      "totalNumberFiles",
+      "maxFileSize",
+      "minFileSize"
+    )
+    assert(targetTable.properties.filterKeys { key =>
+        !metastoreGeneratedProperties.contains(key) && !key.startsWith(DATASOURCE_PREFIX)
+      }.isEmpty,
+      "the table properties of source tables should not be copied in the created table")
+
+    if (DDLUtils.isDatasourceTable(sourceTable) ||
+      sourceTable.tableType == CatalogTableType.VIEW) {
+      assert(DDLUtils.isDatasourceTable(targetTable),
+        "the target table should be a data source table")
+    } else {
+      assert(!DDLUtils.isDatasourceTable(targetTable),
+        "the target table should be a Hive serde table")
+    }
+
+    if (sourceTable.tableType == CatalogTableType.VIEW) {
+      // Source table is a temporary/permanent view, which does not have a provider. The
created
+      // target table uses the default data source format
+      assert(targetTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) ==
+        spark.sessionState.conf.defaultDataSourceName)
+    } else if (DDLUtils.isDatasourceTable(sourceTable)) {
+      assert(targetTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) ==
+        sourceTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER))
+    }
+
+    val sourceTablePath = getTablePath(sourceTable)
+    val targetTablePath = getTablePath(targetTable)
+    assert(targetTablePath.nonEmpty, "target table path should not be empty")
+    assert(sourceTablePath != targetTablePath,
+      "source table/view path should be different from target table path")
+
+    // The source table contents should not been seen in the target table.
+    assert(spark.table(sourceTable.identifier).count() != 0, "the source table should be
nonempty")
+    assert(spark.table(targetTable.identifier).count() == 0, "the target table should be
empty")
+
+    // Their schema should be identical
+    checkAnswer(
+      sql(s"DESC ${sourceTable.identifier}").select("col_name", "data_type"),
+      sql(s"DESC ${targetTable.identifier}").select("col_name", "data_type"))
+
+    withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+      // Check whether the new table can be inserted using the data from the original table
+      sql(s"INSERT INTO TABLE ${targetTable.identifier} SELECT * FROM ${sourceTable.identifier}")
+    }
+
+    // After insertion, the data should be identical
+    checkAnswer(
+      sql(s"SELECT * FROM ${sourceTable.identifier}"),
+      sql(s"SELECT * FROM ${targetTable.identifier}"))
+  }
+
   test("Analyze data source tables(LogicalRelation)") {
     withTable("t1") {
       withTempPath { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message