Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 58CFF182E2 for ; Sat, 15 Aug 2015 04:03:19 +0000 (UTC) Received: (qmail 15504 invoked by uid 500); 15 Aug 2015 04:03:19 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 15471 invoked by uid 500); 15 Aug 2015 04:03:18 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 15462 invoked by uid 99); 15 Aug 2015 04:03:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Aug 2015 04:03:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB348E1785; Sat, 15 Aug 2015 04:03:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <20d7e29f1976403f9fafbc78bce37c41@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns Date: Sat, 15 Aug 2015 04:03:18 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master ec29f2034 -> 6c4fdbec3 [SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns This PR enforce dynamic partition column data type requirements by adding analysis rules. JIRA: https://issues.apache.org/jira/browse/SPARK-8887 Author: Yijie Shen Closes #8201 from yjshen/dynamic_partition_columns. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c4fdbec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c4fdbec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c4fdbec Branch: refs/heads/master Commit: 6c4fdbec33af287d24cd0995ecbd7191545d05c9 Parents: ec29f20 Author: Yijie Shen Authored: Fri Aug 14 21:03:14 2015 -0700 Committer: Reynold Xin Committed: Fri Aug 14 21:03:14 2015 -0700 ---------------------------------------------------------------------- .../execution/datasources/PartitioningUtils.scala | 13 +++++++++++++ .../execution/datasources/ResolvedDataSource.scala | 5 ++++- .../execution/datasources/WriterContainer.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 8 ++++++-- .../spark/sql/sources/hadoopFsRelationSuites.scala | 17 +++++++++++++++++ 5 files changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 66dfcc3..0a2007e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ @@ -270,6 +271,18 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) + def validatePartitionColumnDataTypes( + schema: StructType, + partitionColumns: Array[String]): Unit = { + + ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field => + field.dataType match { + case _: AtomicType => // OK + case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") + } + } + } + /** * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 7770bbd..8fbaf3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging { new ResolvedDataSource(clazz, relation) } - private def partitionColumnsSchema( + def partitionColumnsSchema( schema: StructType, partitionColumns: Array[String]): StructType = { StructType(partitionColumns.map { col => @@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging { val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } + + PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns) + val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) val r = dataSource.createRelation( sqlContext, http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 2f11f40..d36197e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer( PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType)) val str = If(IsNull(c), Literal(defaultPartitionName), escaped) val partitionName = Literal(c.name + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName + if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName } // Returns the partition path given a partition key. http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 40ca8bf..9d3d356 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray) + // Get all input data source relations of the query. val srcRelations = query.collect { case LogicalRelation(src: BaseRelation) => src @@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => + case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (catalog.tableExists(Seq(tableName))) { + if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) { // Need to remove SubQuery operator. EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { // Only do the check if the table is a data source table @@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns) + case _ => // OK } } http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index af44562..8d0d921 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.sql.Date + import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration @@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } + + test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { + val df = Seq( + (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")), + (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")), + (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e") + withTempDir { file => + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath) + } + } + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org