spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-19305][SQL] partitioned table should always put partition columns at the end of table schema
Date Sat, 21 Jan 2017 05:57:58 GMT
Repository: spark
Updated Branches:
  refs/heads/master f174cdc74 -> 3c2ba9fcc


[SPARK-19305][SQL] partitioned table should always put partition columns at the end of table
schema

## What changes were proposed in this pull request?

For data source tables, we will always reorder the specified table schema, or the query in
CTAS, to put partition columns at the end. e.g. `CREATE TABLE t(a int, b int, c int, d int)
USING parquet PARTITIONED BY (d, b)` will create a table with schema `<a, c, d, b>`

Hive serde tables don't have this problem before, because its CREATE TABLE syntax specifies
data schema and partition schema individually.

However, after we unifed the CREATE TABLE syntax, Hive serde table also need to do the reorder.
This PR puts the reorder logic in a analyzer rule,  which works with both data source tables
and Hive serde tables.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16655 from cloud-fan/schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c2ba9fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c2ba9fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c2ba9fc

Branch: refs/heads/master
Commit: 3c2ba9fcc493504c9e7d3caf0b93256ca299cbfe
Parents: f174cdc
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Sat Jan 21 13:57:50 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sat Jan 21 13:57:50 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 60 ++++++++++++++------
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 30 ++++++++++
 2 files changed, 72 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c2ba9fc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5ca8226..c845337 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -199,31 +199,55 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     //   * can't use all table columns as partition columns.
     //   * partition columns' type must be AtomicType.
     //   * sort columns' type must be orderable.
+    //   * reorder table schema or output of query plan, to put partition columns at the
end.
     case c @ CreateTable(tableDesc, _, query) =>
-      val analyzedQuery = query.map { q =>
-        // Analyze the query in CTAS and then we can do the normalization and checking.
-        val qe = sparkSession.sessionState.executePlan(q)
+      if (query.isDefined) {
+        assert(tableDesc.schema.isEmpty,
+          "Schema may not be specified in a Create Table As Select (CTAS) statement")
+
+        val qe = sparkSession.sessionState.executePlan(query.get)
         qe.assertAnalyzed()
-        qe.analyzed
-      }
-      val schema = if (analyzedQuery.isDefined) {
-        analyzedQuery.get.schema
-      } else {
-        tableDesc.schema
-      }
+        val analyzedQuery = qe.analyzed
+
+        val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
+
+        val output = analyzedQuery.output
+        val partitionAttrs = normalizedTable.partitionColumnNames.map { partCol =>
+          output.find(_.name == partCol).get
+        }
+        val newOutput = output.filterNot(partitionAttrs.contains) ++ partitionAttrs
+        val reorderedQuery = if (newOutput == output) {
+          analyzedQuery
+        } else {
+          Project(newOutput, analyzedQuery)
+        }
 
-      val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
-        schema.map(_.name)
+        c.copy(tableDesc = normalizedTable, query = Some(reorderedQuery))
       } else {
-        schema.map(_.name.toLowerCase)
+        val normalizedTable = normalizeCatalogTable(tableDesc.schema, tableDesc)
+
+        val partitionSchema = normalizedTable.partitionColumnNames.map { partCol =>
+          normalizedTable.schema.find(_.name == partCol).get
+        }
+
+        val reorderedSchema =
+          StructType(normalizedTable.schema.filterNot(partitionSchema.contains) ++ partitionSchema)
+
+        c.copy(tableDesc = normalizedTable.copy(schema = reorderedSchema))
       }
-      checkDuplication(columnNames, "table definition of " + tableDesc.identifier)
+  }
 
-      val normalizedTable = tableDesc.copy(
-        partitionColumnNames = normalizePartitionColumns(schema, tableDesc),
-        bucketSpec = normalizeBucketSpec(schema, tableDesc))
+  private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable
= {
+    val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      schema.map(_.name)
+    } else {
+      schema.map(_.name.toLowerCase)
+    }
+    checkDuplication(columnNames, "table definition of " + table.identifier)
 
-      c.copy(tableDesc = normalizedTable, query = analyzedQuery)
+    table.copy(
+      partitionColumnNames = normalizePartitionColumns(schema, table),
+      bucketSpec = normalizeBucketSpec(schema, table))
   }
 
   private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String]
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2ba9fc/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index edef308..7f58603 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1384,4 +1384,34 @@ class HiveDDLSuite
       assert(e2.message.contains("Hive data source can only be used with tables"))
     }
   }
+
+  test("partitioned table should always put partition columns at the end of table schema")
{
+    def getTableColumns(tblName: String): Seq[String] = {
+      spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
+    }
+
+    withTable("t", "t1", "t2", "t3", "t4") {
+      sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
+      assert(getTableColumns("t") == Seq("a", "c", "d", "b"))
+
+      sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1
d")
+      assert(getTableColumns("t1") == Seq("a", "c", "d", "b"))
+
+      Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2")
+      assert(getTableColumns("t2") == Seq("a", "c", "d", "b"))
+
+      withTempPath { path =>
+        val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
+        Seq(1 -> 1).toDF("a", "c").write.save(dataPath)
+
+        sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'")
+        assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
+      }
+
+      sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)")
+      assert(getTableColumns("t4") == Seq("a", "c", "d", "b"))
+
+      // TODO: add test for creating partitioned hive serde table as select, once we support
it.
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message