spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-3928][SQL] Support wildcard matches on Parquet files.
Date Fri, 19 Dec 2014 04:08:39 GMT
Repository: spark
Updated Branches:
  refs/heads/master f728e0fe7 -> b68bc6d26


[SPARK-3928][SQL] Support wildcard matches on Parquet files.

...arquetFile accept hadoop glob pattern in path.

Author: Thu Kyaw <trk007@gmail.com>

Closes #3407 from tkyaw/master and squashes the following commits:

19115ad [Thu Kyaw] Merge https://github.com/apache/spark
ceded32 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files.
d322c28 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files.
ce677c6 [Thu Kyaw] [SPARK-3928][SQL] Support wildcard matches on Parquet files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b68bc6d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b68bc6d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b68bc6d2

Branch: refs/heads/master
Commit: b68bc6d2647f8a5caf8aa558e4115f9cc254f67c
Parents: f728e0f
Author: Thu Kyaw <trk007@gmail.com>
Authored: Thu Dec 18 20:08:32 2014 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Thu Dec 18 20:08:32 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/api/java/JavaSQLContext.scala     |  4 ++-
 .../sql/parquet/ParquetTableOperations.scala    |  4 ++-
 .../spark/sql/parquet/ParquetTestData.scala     | 36 ++++++++++++++++++++
 .../apache/spark/sql/parquet/ParquetTypes.scala | 12 ++++---
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 26 ++++++++++++++
 5 files changed, 76 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b68bc6d2/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 4c0869e..8884204 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -133,7 +133,9 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration
{
   }
 
   /**
-   * Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
+   * Loads a parquet file from regular path or files that match file patterns in path,
+   * returning the result as a [[JavaSchemaRDD]].
+   * Supported glob file pattern information at ([[http://tinyurl.com/kcqrzn8]]).
    */
   def parquetFile(path: String): JavaSchemaRDD =
     new JavaSchemaRDD(

http://git-wip-us.apache.org/repos/asf/spark/blob/b68bc6d2/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5a49384..96bace1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -621,7 +621,9 @@ private[parquet] object FileSystemHelper {
       throw new IllegalArgumentException(
         s"ParquetTableOperations: path $path does not exist or is not a directory")
     }
-    fs.listStatus(path).map(_.getPath)
+    fs.globStatus(path)
+      .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status)
}
+      .map(_.getPath)
   }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b68bc6d2/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
index c0918a4..d599365 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
@@ -422,5 +422,41 @@ private[sql] object ParquetTestData {
     val first = reader.read()
     assert(first != null)
   } */
+
+  // to test golb pattern (wild card pattern matching for parquetFile input
+  val testGlobDir = Utils.createTempDir()
+  val testGlobSubDir1 = Utils.createTempDir(testGlobDir.getPath)
+  val testGlobSubDir2 = Utils.createTempDir(testGlobDir.getPath)
+  val testGlobSubDir3 = Utils.createTempDir(testGlobDir.getPath)
+
+  def writeGlobFiles() = {
+    val subDirs = Array(testGlobSubDir1, testGlobSubDir2, testGlobSubDir3)
+
+    subDirs.foreach { dir =>
+      val path: Path = new Path(new Path(dir.toURI), new Path("part-r-0.parquet"))
+      val job = new Job()
+      val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
+      val writeSupport = new TestGroupWriteSupport(schema)
+      val writer = new ParquetWriter[Group](path, writeSupport)
+
+      for(i <- 0 until 15) {
+        val record = new SimpleGroup(schema)
+        if(i % 3 == 0) {
+          record.add(0, true)
+        } else {
+          record.add(0, false)
+        }
+        if(i % 5 == 0) {
+          record.add(1, 5)
+        }
+        record.add(2, "abc")
+        record.add(3, i.toLong << 33)
+        record.add(4, 2.5F)
+        record.add(5, 4.5D)
+        writer.write(record)
+      }
+      writer.close()
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b68bc6d2/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index fa37d1f..0e6fb57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -437,10 +437,14 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
     val path = origPath.makeQualified(fs)
 
-    val children = fs.listStatus(path).filterNot { status =>
-      val name = status.getPath.getName
-      (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
-    }
+    val children =
+      fs
+        .globStatus(path)
+        .flatMap { status => if(status.isDir) fs.listStatus(status.getPath) else List(status)
}
+        .filterNot { status =>
+          val name = status.getPath.getName
+          (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
+        }
 
     ParquetRelation.enableLogForwarding()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b68bc6d2/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 0e5635d..0748553 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     ParquetTestData.writeNestedFile2()
     ParquetTestData.writeNestedFile3()
     ParquetTestData.writeNestedFile4()
+    ParquetTestData.writeGlobFiles()
     testRDD = parquetFile(ParquetTestData.testDir.toString)
     testRDD.registerTempTable("testsource")
     parquetFile(ParquetTestData.testFilterDir.toString)
@@ -110,6 +111,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     Utils.deleteRecursively(ParquetTestData.testNestedDir2)
     Utils.deleteRecursively(ParquetTestData.testNestedDir3)
     Utils.deleteRecursively(ParquetTestData.testNestedDir4)
+    Utils.deleteRecursively(ParquetTestData.testGlobDir)
     // here we should also unregister the table??
 
     setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, originalParquetFilterPushdownEnabled.toString)
@@ -1049,4 +1051,28 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
         "Comparison predicate with null shouldn't be pushed down")
     }
   }
+
+  test("Import of simple Parquet files using glob wildcard pattern") {
+    val testGlobDir = ParquetTestData.testGlobDir.toString
+    val globPatterns = Array(testGlobDir + "/*/*", testGlobDir + "/spark-*/*", testGlobDir
+ "/?pa?k-*/*")
+    globPatterns.foreach { path =>
+      val result = parquetFile(path).collect()
+      assert(result.size === 45)
+      result.zipWithIndex.foreach {
+        case (row, index) => {
+          val checkBoolean =
+            if ((index % 15) % 3 == 0)
+              row(0) == true
+            else
+              row(0) == false
+          assert(checkBoolean === true, s"boolean field value in line $index did not match")
+          if ((index % 15) % 5 == 0) assert(row(1) === 5, s"int field value in line $index
did not match")
+          assert(row(2) === "abc", s"string field value in line $index did not match")
+          assert(row(3) === ((index.toLong % 15) << 33), s"long value in line $index
did not match")
+          assert(row(4) === 2.5F, s"float field value in line $index did not match")
+          assert(row(5) === 4.5D, s"double field value in line $index did not match")
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message