spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-14997][SQL] Fixed FileCatalog to return correct set of files when there is no partitioning scheme in the given paths
Date Fri, 06 May 2016 22:04:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master e20cd9f4c -> f7b7ef416


[SPARK-14997][SQL] Fixed FileCatalog to return correct set of files when there is no partitioning
scheme in the given paths

## What changes were proposed in this pull request?
Lets says there are json files in the following directories structure
```
xyz/file0.json
xyz/subdir1/file1.json
xyz/subdir2/file2.json
xyz/subdir1/subsubdir1/file3.json
```
`sqlContext.read.json("xyz")` should read only file0.json according to behavior in Spark 1.6.1.
However in current master, all the 4 files are read.

The fix is to make FileCatalog return only the children files of the given path if there is
not partitioning detected (instead of all the recursive list of files).

Closes #12774

## How was this patch tested?

unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #12856 from tdas/SPARK-14997.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7b7ef41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7b7ef41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7b7ef41

Branch: refs/heads/master
Commit: f7b7ef41662d7d02fc4f834f3c6c4ee8802e949c
Parents: e20cd9f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Fri May 6 15:04:16 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Fri May 6 15:04:16 2016 -0700

----------------------------------------------------------------------
 .../PartitioningAwareFileCatalog.scala          |  24 +-
 .../datasources/FileCatalogSuite.scala          |  68 ++++++
 .../ParquetPartitionDiscoverySuite.scala        |  47 ++++
 .../sql/streaming/FileStreamSourceSuite.scala   |  15 +-
 .../sql/sources/HadoopFsRelationTest.scala      | 232 +++++++++++++++++--
 5 files changed, 356 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7b7ef41/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 2c44b39..5f04a6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -61,7 +61,29 @@ abstract class PartitioningAwareFileCatalog(
     }
   }
 
-  override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
+  override def allFiles(): Seq[FileStatus] = {
+    if (partitionSpec().partitionColumns.isEmpty) {
+      // For each of the input paths, get the list of files inside them
+      paths.flatMap { path =>
+        // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+        val fs = path.getFileSystem(hadoopConf)
+        val qualifiedPath = fs.makeQualified(path)
+
+        // There are three cases possible with each path
+        // 1. The path is a directory and has children files in it. Then it must be present
in
+        //    leafDirToChildrenFiles as those children files will have been found as leaf
files.
+        //    Find its children files from leafDirToChildrenFiles and include them.
+        // 2. The path is a file, then it will be present in leafFiles. Include this path.
+        // 3. The path is a directory, but has no children files. Do not include this path.
+
+        leafDirToChildrenFiles.get(qualifiedPath)
+          .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
+          .getOrElse(Array.empty)
+      }
+    } else {
+      leafFiles.values.toSeq
+    }
+  }
 
   protected def inferPartitioning(): PartitionSpec = {
     // We use leaf dirs containing data files to discover the schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/f7b7ef41/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
new file mode 100644
index 0000000..dab5c76
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileCatalogSuite extends SharedSQLContext {
+
+  test("ListingFileCatalog: leaf files are qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val path = new Path(file.getCanonicalPath)
+      val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty,
None) {
+        def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+        def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+      }
+      assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
+      assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
+    }
+  }
+
+  test("ListingFileCatalog: input paths are converted to qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val unqualifiedDirPath = new Path(dir.getCanonicalPath)
+      val unqualifiedFilePath = new Path(file.getCanonicalPath)
+      require(!unqualifiedDirPath.toString.contains("file:"))
+      require(!unqualifiedFilePath.toString.contains("file:"))
+
+      val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+      val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
+      require(qualifiedFilePath.toString.startsWith("file:"))
+
+      val catalog1 = new ListingFileCatalog(
+        sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
+      assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+      val catalog2 = new ListingFileCatalog(
+        sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
+      assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f7b7ef41/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index cb2c252..b4d35be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
with Sha
     }
   }
 
+  test("use basePath and file globbing to selectively load partitioned table") {
+    withTempPath { dir =>
+
+      val df = Seq(
+        (1, "foo", 100),
+        (1, "bar", 200),
+        (2, "foo", 300),
+        (2, "bar", 400)
+      ).toDF("p1", "p2", "v")
+      df.write
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .parquet(dir.getCanonicalPath)
+
+      def check(path: String, basePath: String, expectedDf: DataFrame): Unit = {
+        val testDf = sqlContext.read
+          .option("basePath", basePath)
+          .parquet(path)
+        checkAnswer(testDf, expectedDf)
+      }
+
+      // Should find all the data with partitioning columns when base path is set to the
root
+      val resultDf = df.select("v", "p1", "p2")
+      check(path = s"$dir", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*/*", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf)
+
+      // Should find selective partitions of the data if the base path is not set to root
+
+      check(          // read from ../p1=1 with base ../p1=1, should not infer p1 col
+        path = s"$dir/p1=1/*",
+        basePath = s"$dir/p1=1/",
+        resultDf.filter("p1 = 1").drop("p1"))
+
+      check(          // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1
+        path = s"$dir/p1=1/p2=foo/*",
+        basePath = s"$dir/p1=1/",
+        resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1"))
+
+      check(          // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer
p1, p2
+        path = s"$dir/p1=1/p2=foo/*",
+        basePath = s"$dir/p1=1/p2=foo/",
+        resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2"))
+    }
+  }
+
   test("_SUCCESS should not break partitioning discovery") {
     Seq(1, 32).foreach { threshold =>
       // We have two paths to list files, one at driver side, another one that we use

http://git-wip-us.apache.org/repos/asf/spark/blob/f7b7ef41/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bc5c0c1..a62852b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.util.UUID
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
@@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
       AddParquetFileData(seq.toDS().toDF(), src, tmp)
     }
 
+    /** Write parquet files in a temp dir, and move the individual files to the 'src' dir
*/
     def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
-      val file = Utils.tempFileWith(new File(tmp, "parquet"))
-      df.write.parquet(file.getCanonicalPath)
-      file.renameTo(new File(src, file.getName))
+      val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
+      df.write.parquet(tmpDir.getCanonicalPath)
+      tmpDir.listFiles().foreach { f =>
+        f.renameTo(new File(src, s"${f.getName}"))
+      }
     }
   }
 
@@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
 
   test("FileStreamSource schema: parquet, existing files, no schema") {
     withTempDir { src =>
-      Seq("a", "b", "c").toDS().as("userColumn").toDF()
-        .write.parquet(new File(src, "1").getCanonicalPath)
+      Seq("a", "b", "c").toDS().as("userColumn").toDF().write
+        .mode(org.apache.spark.sql.SaveMode.Overwrite)
+        .parquet(src.getCanonicalPath)
       val schema = createFileStreamSourceAndGetSchema(
         format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
       assert(schema === new StructType().add("value", StringType))

http://git-wip-us.apache.org/repos/asf/spark/blob/f7b7ef41/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 67b403a..20c5f72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.io.File
+
 import scala.util.Random
 
 import org.apache.hadoop.fs.Path
@@ -486,40 +488,222 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils
with Tes
     }
   }
 
-  test("Hadoop style globbing") {
+  test("load() - with directory of unpartitioned data in nested subdirs") {
+    withTempPath { dir =>
+      val subdir = new File(dir, "subdir")
+
+      val dataInDir = Seq(1, 2, 3).toDF("value")
+      val dataInSubdir = Seq(4, 5, 6).toDF("value")
+
+      /*
+
+        Directory structure to be generated
+
+        dir
+          |
+          |___ [ files of dataInDir ]
+          |
+          |___ subsubdir
+                    |
+                    |___ [ files of dataInSubdir ]
+      */
+
+      // Generated dataInSubdir, not data in dir
+      dataInSubdir.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .save(subdir.getCanonicalPath)
+
+      // Inferring schema should throw error as it should not find any file to infer
+      val e = intercept[Exception] {
+        sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath)
+      }
+
+      e match {
+        case _: AnalysisException =>
+          assert(e.getMessage.contains("infer"))
+
+        case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") =>
+          // Ignore error, the source format requires schema to be provided by user
+          // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs
schema
+
+        case _ =>
+          fail("Unexpected error trying to infer schema from empty dir", e)
+      }
+
+      /** Test whether data is read with the given path matches the expected answer */
+      def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = {
+        val df = sqlContext.read
+          .format(dataSourceName)
+          .schema(dataInDir.schema) // avoid schema inference for any format
+          .load(path.getCanonicalPath)
+        checkAnswer(df, expectedAnswer)
+      }
+
+      // Verify that reading by path 'dir/' gives empty results as there are no files in
'file'
+      // and it should not pick up files in 'dir/subdir'
+      require(subdir.exists)
+      require(subdir.listFiles().exists(!_.isDirectory))
+      testWithPath(dir, Seq.empty)
+
+      // Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir
+      dataInDir.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)   // append to prevent subdir from being deleted
+        .save(dir.getCanonicalPath)
+      require(dir.listFiles().exists(!_.isDirectory))
+      require(subdir.exists())
+      require(subdir.listFiles().exists(!_.isDirectory))
+      testWithPath(dir, dataInDir.collect())
+    }
+  }
+
+  test("Hadoop style globbing - unpartitioned data") {
     withTempPath { file =>
+
+      val dir = file.getCanonicalPath
+      val subdir = new File(dir, "subdir")
+      val subsubdir = new File(subdir, "subsubdir")
+      val anotherSubsubdir =
+        new File(new File(dir, "another-subdir"), "another-subsubdir")
+
+      val dataInSubdir = Seq(1, 2, 3).toDF("value")
+      val dataInSubsubdir = Seq(4, 5, 6).toDF("value")
+      val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value")
+
+      dataInSubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (subdir.getCanonicalPath)
+
+      dataInSubsubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (subsubdir.getCanonicalPath)
+
+      dataInAnotherSubsubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (anotherSubsubdir.getCanonicalPath)
+
+      require(subdir.exists)
+      require(subdir.listFiles().exists(!_.isDirectory))
+      require(subsubdir.exists)
+      require(subsubdir.listFiles().exists(!_.isDirectory))
+      require(anotherSubsubdir.exists)
+      require(anotherSubsubdir.listFiles().exists(!_.isDirectory))
+
+      /*
+        Directory structure generated
+
+        dir
+          |
+          |___ subdir
+          |     |
+          |     |___ [ files of dataInSubdir ]
+          |     |
+          |     |___ subsubdir
+          |               |
+          |               |___ [ files of dataInSubsubdir ]
+          |
+          |
+          |___ anotherSubdir
+                |
+                |___ anotherSubsubdir
+                          |
+                          |___ [ files of dataInAnotherSubsubdir ]
+       */
+
+      val schema = dataInSubdir.schema
+
+      /** Check whether data is read with the given path matches the expected answer */
+      def check(path: String, expectedDf: DataFrame): Unit = {
+        val df = sqlContext.read
+          .format(dataSourceName)
+          .schema(schema) // avoid schema inference for any format, expected to be same format
+          .load(path)
+        checkAnswer(df, expectedDf)
+      }
+
+      check(s"$dir/*/", dataInSubdir)
+      check(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir))
+      check(s"$dir/another*/*", dataInAnotherSubsubdir)
+      check(s"$dir/*/another*", dataInAnotherSubsubdir)
+      check(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir))
+    }
+  }
+
+  test("Hadoop style globbing - partitioned data with schema inference") {
+
+    // Tests the following on partition data
+    // - partitions are not discovered with globbing and without base path set.
+    // - partitions are discovered with globbing and base path set, though more detailed
+    //   tests for this is in ParquetPartitionDiscoverySuite
+
+    withTempPath { path =>
+      val dir = path.getCanonicalPath
       partitionedTestDF.write
         .format(dataSourceName)
         .mode(SaveMode.Overwrite)
         .partitionBy("p1", "p2")
-        .save(file.getCanonicalPath)
+        .save(dir)
+
+      def check(
+          path: String,
+          expectedResult: Either[DataFrame, String],
+          basePath: Option[String] = None
+        ): Unit = {
+        try {
+          val reader = sqlContext.read
+          basePath.foreach(reader.option("basePath", _))
+          val testDf = reader
+            .format(dataSourceName)
+            .load(path)
+          assert(expectedResult.isLeft, s"Error was expected with $path but result found")
+          checkAnswer(testDf, expectedResult.left.get)
+        } catch {
+          case e: java.util.NoSuchElementException if e.getMessage.contains("dataSchema")
=>
+            // Ignore error, the source format requires schema to be provided by user
+            // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs
schema
+
+          case e: Throwable =>
+            assert(expectedResult.isRight, s"Was not expecting error with $path: " + e)
+            assert(
+              e.getMessage.contains(expectedResult.right.get),
+              s"Did not find expected error message wiht $path")
+        }
+      }
 
-      val df = sqlContext.read
-        .format(dataSourceName)
-        .option("dataSchema", dataSchema.json)
-        .option("basePath", file.getCanonicalPath)
-        .load(s"${file.getCanonicalPath}/p1=*/p2=???")
-
-      val expectedPaths = Set(
-        s"${file.getCanonicalFile}/p1=1/p2=foo",
-        s"${file.getCanonicalFile}/p1=2/p2=foo",
-        s"${file.getCanonicalFile}/p1=1/p2=bar",
-        s"${file.getCanonicalFile}/p1=2/p2=bar"
-      ).map { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf())
-        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      object Error {
+        def apply(msg: String): Either[DataFrame, String] = Right(msg)
       }
 
-      val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: HadoopFsRelation, _, _) =>
-          relation.location.paths.map(_.toString).toSet
-      }.getOrElse {
-        fail("Expect an FSBasedRelation, but none could be found")
+      object Result {
+        def apply(df: DataFrame): Either[DataFrame, String] = Left(df)
       }
 
-      assert(actualPaths === expectedPaths)
-      checkAnswer(df, partitionedTestDF.collect())
+      // ---- Without base path set ----
+      // Should find all the data with partitioning columns
+      check(s"$dir", Result(partitionedTestDF))
+
+      // Should fail as globbing finds dirs without files, only subdirs in them.
+      check(s"$dir/*/", Error("please set \"basePath\""))
+      check(s"$dir/p1=*/", Error("please set \"basePath\""))
+
+      // Should not find partition columns as the globs resolve to p2 dirs
+      // with files in them
+      check(s"$dir/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+      check(s"$dir/p1=*/p2=foo", Result(partitionedTestDF.filter("p2 = 'foo'").drop("p1",
"p2")))
+      check(s"$dir/p1=1/p2=???", Result(partitionedTestDF.filter("p1 = 1").drop("p1", "p2")))
+
+      // Should find all data without the partitioning columns as the globs resolve to the
files
+      check(s"$dir/*/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+
+      // ---- With base path set ----
+      val resultDf = partitionedTestDF.select("a", "b", "p1", "p2")
+      check(path = s"$dir/*", Result(resultDf), basePath = Some(dir))
+      check(path = s"$dir/*/*", Result(resultDf), basePath = Some(dir))
+      check(path = s"$dir/*/*/*", Result(resultDf), basePath = Some(dir))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message