spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-15743][SQL] Prevent saving with all-column partitioning
Date Fri, 10 Jun 2016 19:43:29 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7d7a0a5e0 -> 2413fce9d


[SPARK-15743][SQL] Prevent saving with all-column partitioning

## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to construct the directory
structure. However, if a user choose all columns as partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema
inference failure.
 ```scala
scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must
be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution**
due to Parquet limitation.
 ```scala
scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>                                                          (0 + 8) / 8]16/06/02
16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does
not support empty group without leaves. Empty group: spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any problem, it seems
not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising `AnalysisException`
before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13486 from dongjoon-hyun/SPARK-15743.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2413fce9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2413fce9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2413fce9

Branch: refs/heads/master
Commit: 2413fce9d6812a91eeffb4435c2b5b361d23214b
Parents: 7d7a0a5
Author: Dongjoon Hyun <dongjoon@apache.org>
Authored: Fri Jun 10 12:43:27 2016 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Fri Jun 10 12:43:27 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 32 ++++++++++----------
 .../datasources/PartitioningUtils.scala         |  8 +++--
 .../spark/sql/execution/datasources/rules.scala |  4 +--
 .../execution/streaming/FileStreamSink.scala    |  2 +-
 .../test/DataFrameReaderWriterSuite.scala       | 12 ++++++++
 5 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf..d327302 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql.execution.datasources
 
@@ -432,7 +432,7 @@ case class DataSource(
         }
 
         val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           data.schema, partitionColumns, caseSensitive)
 
         // If we are appending to a table that already exists, make sure the partitioning
matches

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 74f2993..2340ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
-  def validatePartitionColumnDataTypes(
+  def validatePartitionColumn(
       schema: StructType,
       partitionColumns: Seq[String],
       caseSensitive: Boolean): Unit = {
@@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
         case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition
column")
       }
     }
+
+    if (partitionColumns.size == schema.fields.size) {
+      throw new AnalysisException(s"Cannot use all columns for partition columns")
+    }
   }
 
   def partitionColumnsSchema(
@@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
     val equality = columnNameEquality(caseSensitive)
     StructType(partitionColumns.map { col =>
       schema.find(f => equality(f.name, col)).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema $schema")
+        throw new AnalysisException(s"Partition column $col not found in schema $schema")
       }
     }).asNullable
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 9afd715..7ac62fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
 
         // Get all input data source relations of the query.
@@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
 
         for {

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e191010..efb0491 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -91,7 +91,7 @@ class FileStreamSinkWriter(
     hadoopConf: Configuration,
     options: Map[String, String]) extends Serializable with Logging {
 
-  PartitioningUtils.validatePartitionColumnDataTypes(
+  PartitioningUtils.validatePartitionColumn(
     data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
 
   private val serializableConf = new SerializableConfiguration(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 431a943..bf6063a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter
{
 
     cq.awaitTermination(2000L)
   }
+
+  test("prevent all column partitioning") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      intercept[AnalysisException] {
+        spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+      }
+      intercept[AnalysisException] {
+        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message