spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns
Date Sat, 15 Aug 2015 04:03:18 GMT
Repository: spark
Updated Branches:
  refs/heads/master ec29f2034 -> 6c4fdbec3


[SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns

This PR enforce dynamic partition column data type requirements by adding analysis rules.

JIRA: https://issues.apache.org/jira/browse/SPARK-8887

Author: Yijie Shen <henry.yijieshen@gmail.com>

Closes #8201 from yjshen/dynamic_partition_columns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c4fdbec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c4fdbec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c4fdbec

Branch: refs/heads/master
Commit: 6c4fdbec33af287d24cd0995ecbd7191545d05c9
Parents: ec29f20
Author: Yijie Shen <henry.yijieshen@gmail.com>
Authored: Fri Aug 14 21:03:14 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Aug 14 21:03:14 2015 -0700

----------------------------------------------------------------------
 .../execution/datasources/PartitioningUtils.scala  | 13 +++++++++++++
 .../execution/datasources/ResolvedDataSource.scala |  5 ++++-
 .../execution/datasources/WriterContainer.scala    |  2 +-
 .../spark/sql/execution/datasources/rules.scala    |  8 ++++++--
 .../spark/sql/sources/hadoopFsRelationSuites.scala | 17 +++++++++++++++++
 5 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 66dfcc3..0a2007e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
@@ -270,6 +271,18 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
+  def validatePartitionColumnDataTypes(
+      schema: StructType,
+      partitionColumns: Array[String]): Unit = {
+
+    ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field =>
+      field.dataType match {
+        case _: AtomicType => // OK
+        case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition
column")
+      }
+    }
+  }
+
   /**
    * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
    * types.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 7770bbd..8fbaf3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging {
     new ResolvedDataSource(clazz, relation)
   }
 
-  private def partitionColumnsSchema(
+  def partitionColumnsSchema(
       schema: StructType,
       partitionColumns: Array[String]): StructType = {
     StructType(partitionColumns.map { col =>
@@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging {
           val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
+
+        PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns)
+
         val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
         val r = dataSource.createRelation(
           sqlContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 2f11f40..d36197e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer(
           PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
       val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
       val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
+      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
     }
 
     // Returns the partition path given a partition key.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 40ca8bf..9d3d356 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan
=>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray)
+
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case LogicalRelation(src: BaseRelation) => src
@@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan
=>
           // OK
         }
 
-      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
+      case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table
of
         // the query. If so, we will throw an AnalysisException to let users know it is not
allowed.
-        if (catalog.tableExists(Seq(tableName))) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) {
           // Need to remove SubQuery operator.
           EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
             // Only do the check if the table is a data source table
@@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan
=>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns)
+
       case _ => // OK
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af44562..8d0d921 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.sql.Date
+
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
@@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils
{
       clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
     }
   }
+
+  test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns")
{
+    val df = Seq(
+      (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
+      (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")),
+      (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c",
"d", "e")
+    withTempDir { file =>
+      intercept[AnalysisException] {
+        df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath)
+      }
+    }
+    intercept[AnalysisException] {
+      df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message