spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject git commit: [SPARK-3810][SQL] Makes PreInsertionCasts handle partitions properly
Date Thu, 09 Oct 2014 01:11:19 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4ec931951 -> e70335723


[SPARK-3810][SQL] Makes PreInsertionCasts handle partitions properly

Includes partition keys into account when applying `PreInsertionCasts` rule.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2672 from liancheng/fix-pre-insert-casts and squashes the following commits:

def1a1a [Cheng Lian] Makes PreInsertionCasts handle partitions properly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7033572
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7033572
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7033572

Branch: refs/heads/master
Commit: e7033572330bd48b2438f218b0d2cd3fccdeb362
Parents: 4ec9319
Author: Cheng Lian <lian.cs.zju@gmail.com>
Authored: Wed Oct 8 18:11:18 2014 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Oct 8 18:11:18 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 15 +++-----
 .../sql/hive/execution/HiveQuerySuite.scala     | 36 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7033572/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cc0605b..addd5be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,31 +19,28 @@ package org.apache.spark.sql.hive
 
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition
=> TPartition, Table => TTable}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.Deserializer
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog}
+import org.apache.spark.sql.catalyst.analysis.Catalog
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.util.Utils
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
 
 private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging
{
-  import HiveMetastoreTypes._
+  import org.apache.spark.sql.hive.HiveMetastoreTypes._
 
   /** Connection to hive metastore.  Usages should lock on `this`. */
   protected[hive] val client = Hive.get(hive.hiveconf)
@@ -137,10 +134,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
 
     def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan)
= {
       val childOutputDataTypes = child.output.map(_.dataType)
-      // Only check attributes, not partitionKeys since they are always strings.
-      // TODO: Fully support inserting into partitioned tables.
       val tableOutputDataTypes =
-        table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType)
+        (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
 
       if (childOutputDataTypes == tableOutputDataTypes) {
         p

http://git-wip-us.apache.org/repos/asf/spark/blob/e7033572/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2e282a9..2829105 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -22,6 +22,7 @@ import scala.util.Try
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -675,6 +676,41 @@ class HiveQuerySuite extends HiveComparisonTest {
     sql("SELECT * FROM boom").queryExecution.analyzed
   }
 
+  test("SPARK-3810: PreInsertionCasts static partitioning support") {
+    val analyzedPlan = {
+      loadTestTable("srcpart")
+      sql("DROP TABLE IF EXISTS withparts")
+      sql("CREATE TABLE withparts LIKE srcpart")
+      sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
+        .queryExecution.analyzed
+    }
+
+    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+      analyzedPlan.collect {
+        case _: Project => ()
+      }.size
+    }
+  }
+
+  test("SPARK-3810: PreInsertionCasts dynamic partitioning support") {
+    val analyzedPlan = {
+      loadTestTable("srcpart")
+      sql("DROP TABLE IF EXISTS withparts")
+      sql("CREATE TABLE withparts LIKE srcpart")
+      sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+      sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
+      sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src")
+        .queryExecution.analyzed
+    }
+
+    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+      analyzedPlan.collect {
+        case _: Project => ()
+      }.size
+    }
+  }
+
   test("parse HQL set commands") {
     // Adapted from its SQL counterpart.
     val testKey = "spark.sql.key.usedfortestonly"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message