spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-11544][SQL][TEST-HADOOP1.0] sqlContext doesn't use PathFilter
Date Fri, 20 Nov 2015 03:46:14 GMT
Repository: spark
Updated Branches:
  refs/heads/master ee2140774 -> 7ee7d5a3c


[SPARK-11544][SQL][TEST-HADOOP1.0] sqlContext doesn't use PathFilter

Apply the user supplied pathfilter while retrieving the files from fs.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #9830 from dilipbiswal/spark-11544.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ee7d5a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ee7d5a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ee7d5a3

Branch: refs/heads/master
Commit: 7ee7d5a3c4ff77d2cee2afce36ff41f6302e6315
Parents: ee21407
Author: Dilip Biswal <dbiswal@us.ibm.com>
Authored: Thu Nov 19 19:46:10 2015 -0800
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Nov 19 19:46:10 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 25 +++++++++---
 .../execution/datasources/json/JsonSuite.scala  | 41 +++++++++++++++++++-
 2 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ee7d5a3/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index b3d3bdf..f946515 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.{Logging, SparkContext}
@@ -447,9 +448,15 @@ abstract class HadoopFsRelation private[sql](
           val hdfsPath = new Path(path)
           val fs = hdfsPath.getFileSystem(hadoopConf)
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
           logInfo(s"Listing $qualified on driver")
-          Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+          // Dummy jobconf to get to the pathFilter defined in configuration
+          val jobConf = new JobConf(hadoopConf, this.getClass())
+          val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+          if (pathFilter != null) {
+            Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
+          } else {
+            Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+          }
         }.filterNot { status =>
           val name = status.getPath.getName
           name.toLowerCase == "_temporary" || name.startsWith(".")
@@ -847,8 +854,16 @@ private[sql] object HadoopFsRelation extends Logging {
     if (name == "_temporary" || name.startsWith(".")) {
       Array.empty
     } else {
-      val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-      files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+      // Dummy jobconf to get to the pathFilter defined in configuration
+      val jobConf = new JobConf(fs.getConf, this.getClass())
+      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+      if (pathFilter != null) {
+        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
+        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+      } else {
+        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee7d5a3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6042b11..ba7718c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -19,19 +19,27 @@ package org.apache.spark.sql.execution.datasources.json
 
 import java.io.{File, StringWriter}
 import java.sql.{Date, Timestamp}
+import scala.collection.JavaConverters._
 
 import com.fasterxml.jackson.core.JsonFactory
-import org.apache.spark.rdd.RDD
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, PathFilter}
 import org.scalactic.Tolerance._
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+class TestFileFilter extends PathFilter {
+  override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
+}
+
 class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   import testImplicits._
 
@@ -1390,4 +1398,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData
{
       )
     }
   }
+
+  test("SPARK-11544 test pathfilter") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext.range(2)
+      df.write.json(path + "/p=1")
+      df.write.json(path + "/p=2")
+      assert(sqlContext.read.json(path).count() === 4)
+
+      val clonedConf = new Configuration(hadoopConfiguration)
+      try {
+        // Setting it twice as the name of the propery has changed between hadoop versions.
+        hadoopConfiguration.setClass(
+          "mapred.input.pathFilter.class",
+          classOf[TestFileFilter],
+          classOf[PathFilter])
+        hadoopConfiguration.setClass(
+          "mapreduce.input.pathFilter.class",
+          classOf[TestFileFilter],
+          classOf[PathFilter])
+        assert(sqlContext.read.json(path).count() === 2)
+      } finally {
+        // Hadoop 1 doesn't have `Configuration.unset`
+        hadoopConfiguration.clear()
+        clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message