spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-7737] [SQL] Use leaf dirs having data files to discover partitions.
Date Thu, 21 May 2015 23:12:20 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 2be72c99a -> 11a0640db


[SPARK-7737] [SQL] Use leaf dirs having data files to discover partitions.

https://issues.apache.org/jira/browse/SPARK-7737

cc liancheng

Author: Yin Huai <yhuai@databricks.com>

Closes #6329 from yhuai/spark-7737 and squashes the following commits:

7e0dfc7 [Yin Huai] Use leaf dirs having data files to discover partitions.

(cherry picked from commit 347b50106bd1bcd40049f1ca29cefbb0baf53413)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11a0640d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11a0640d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11a0640d

Branch: refs/heads/branch-1.4
Commit: 11a0640db916f4f3a585809b3362b0540bedfb09
Parents: 2be72c9
Author: Yin Huai <yhuai@databricks.com>
Authored: Fri May 22 07:10:26 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Fri May 22 07:12:12 2015 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/sources/interfaces.scala   |  7 ++-----
 .../sql/parquet/ParquetPartitionDiscoverySuite.scala      | 10 +++++++++-
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11a0640d/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 61fc4e5..aaabbad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -377,8 +377,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
 
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
-    var leafDirs = mutable.Map.empty[Path, FileStatus]
-
     def refresh(): Unit = {
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
         val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
@@ -386,7 +384,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
         files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
       }
 
-      leafDirs.clear()
       leafFiles.clear()
 
       // We don't filter files/directories like _temporary/_SUCCESS here, as specific data
sources
@@ -399,7 +396,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       }
 
       val (dirs, files) = statuses.partition(_.isDir)
-      leafDirs ++= dirs.map(d => d.getPath -> d).toMap
       leafFiles ++= files.map(f => f.getPath -> f).toMap
       leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
     }
@@ -484,7 +480,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   }
 
   private def discoverPartitions(): PartitionSpec = {
-    val leafDirs = fileStatusCache.leafDirs.keys.toSeq
+    // We use leaf dirs containing data files to discover the schema.
+    val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
     PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/11a0640d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 907dbb0..90d4528 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.parquet
 
+import java.io.File
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
@@ -175,11 +177,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
         pi <- Seq(1, 2)
         ps <- Seq("foo", "bar")
       } {
+        val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" ->
ps)
         makeParquetFile(
           (1 to 10).map(i => ParquetData(i, i.toString)),
-          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+          dir)
+        // Introduce _temporary dir to test the robustness of the schema discovery process.
+        new File(dir.toString, "_temporary").mkdir()
       }
+      // Introduce _temporary dir to the base dir the robustness of the schema discovery
process.
+      new File(base.getCanonicalPath, "_temporary").mkdir()
 
+      println("load the partitioned table")
       read.parquet(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message