spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data sources implementing FileFormat
Date Thu, 22 Dec 2016 18:00:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4186aba63 -> 76622c661


[SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data sources implementing
FileFormat

## What changes were proposed in this pull request?

This PR cleans up duplicated checking for file paths in implemented data sources and prevent
to attempt to list twice in ORC data source.

https://github.com/apache/spark/pull/14585 handles a problem for the partition column name
having `_` and the issue itself is resolved correctly. However, it seems the data sources
implementing `FileFormat` are validating the paths duplicately. Assuming from the comment
in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately.

   Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`.
So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes
below:

   ``` scala
   spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet")
   spark.read.parquet("/tmp/parquet")
   ```

   gives the paths below without directories but just valid data files:

   ``` bash
   /tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   /tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   /tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet
   ...
   ```

   to `FileFormat.inferSchema`.

## How was this patch tested?

Unit test added in `HadoopFsRelationTest` and related existing tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14627 from HyukjinKwon/SPARK-16975.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76622c66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76622c66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76622c66

Branch: refs/heads/master
Commit: 76622c661fcae81eb0352c61f54a2e9e21a4fb98
Parents: 4186aba
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Thu Dec 22 10:00:20 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Dec 22 10:00:20 2016 -0800

----------------------------------------------------------------------
 .../execution/datasources/csv/CSVFileFormat.scala  |  5 ++---
 .../datasources/json/JsonFileFormat.scala          |  7 +------
 .../datasources/parquet/ParquetFileFormat.scala    |  7 +------
 .../spark/sql/sources/HadoopFsRelationTest.scala   | 17 +++++++++++++++++
 4 files changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index e627f04..b0feaeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -55,10 +55,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister
{
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     require(files.nonEmpty, "Cannot infer schema from an empty set of files")
-    val csvOptions = new CSVOptions(options)
 
-    // TODO: Move filtering.
-    val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
+    val csvOptions = new CSVOptions(options)
+    val paths = files.map(_.getPath.toString)
     val lines: Dataset[String] = readText(sparkSession, csvOptions, paths)
     val firstLine: String = findFirstLine(csvOptions, lines)
     val firstRow = new CsvReader(csvOptions).parseLine(firstLine)

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index c957914..a9d8ddf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -51,13 +51,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister
{
       val columnNameOfCorruptRecord =
         parsedOptions.columnNameOfCorruptRecord
           .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-      val jsonFiles = files.filterNot { status =>
-        val name = status.getPath.getName
-        (name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
-      }.toArray
-
       val jsonSchema = InferSchema.infer(
-        createBaseRdd(sparkSession, jsonFiles),
+        createBaseRdd(sparkSession, files),
         columnNameOfCorruptRecord,
         parsedOptions)
       checkConstraints(jsonSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0965ffe..0efe6da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -241,12 +241,7 @@ class ParquetFileFormat
       commonMetadata: Seq[FileStatus])
 
   private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
-    // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-    val leaves = allFiles.filter { f =>
-      isSummaryFile(f.getPath) ||
-        !((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("="))
||
-          f.getPath.getName.startsWith("."))
-    }.toArray.sortBy(_.getPath.toString)
+    val leaves = allFiles.toArray.sortBy(_.getPath.toString)
 
     FileTypes(
       data = leaves.filterNot(f => isSummaryFile(f.getPath)),

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 224b2c6..06566a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -877,6 +877,23 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils
with Tes
       }
     }
   }
+
+  test("SPARK-16975: Partitioned table with the column having '_' should be read correctly")
{
+    withTempDir { dir =>
+      val childDir = new File(dir, dataSourceName).getCanonicalPath
+      val dataDf = spark.range(10).toDF()
+      val df = dataDf.withColumn("_col", $"id")
+      df.write.format(dataSourceName).partitionBy("_col").save(childDir)
+      val reader = spark.read.format(dataSourceName)
+
+      // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema.
+      if (dataSourceName == classOf[SimpleTextSource].getCanonicalName) {
+        reader.option("dataSchema", dataDf.schema.json)
+      }
+      val readBack = reader.load(childDir)
+      checkAnswer(df, readBack)
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message