spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-14837][SQL][STREAMING] Added support in file stream source for reading new files added to subdirs
Date Tue, 10 May 2016 23:43:45 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f021f3460 -> d8c2da9a4


[SPARK-14837][SQL][STREAMING] Added support in file stream source for reading new files added
to subdirs

## What changes were proposed in this pull request?
Currently, file stream source can only find new files if they appear in the directory given
to the source, but not if they appear in subdirs. This PR add support for providing glob patterns
when creating file stream source so that it can find new files in nested directories based
on the glob pattern.

## How was this patch tested?

Unit test that tests when new files are discovered with globs and partitioned directories.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #12616 from tdas/SPARK-14837.

(cherry picked from commit d9ca9fd3e582f9d29f8887c095637c93a8b93651)
Signed-off-by: Yin Huai <yhuai@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8c2da9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8c2da9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8c2da9a

Branch: refs/heads/branch-2.0
Commit: d8c2da9a46f240438c5350f73a7b3daa07c101a8
Parents: f021f34
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue May 10 16:43:32 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Tue May 10 16:43:41 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 18 +---
 .../datasources/ListingFileCatalog.scala        |  1 +
 .../PartitioningAwareFileCatalog.scala          |  4 +-
 .../execution/streaming/FileStreamSource.scala  | 40 ++++++---
 .../sql/streaming/FileStreamSourceSuite.scala   | 88 ++++++++++++++++++--
 5 files changed, 114 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8c2da9a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0342ec5..ce45168 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -174,25 +174,11 @@ case class DataSource(
         s.createSource(sparkSession.wrapped, metadataPath, userSpecifiedSchema, className,
options)
 
       case format: FileFormat =>
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val path = caseInsensitiveOptions.getOrElse("path", {
+        val path = new CaseInsensitiveMap(options).getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
-
-        def dataFrameBuilder(files: Array[String]): DataFrame = {
-          val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
-          val newDataSource =
-            DataSource(
-              sparkSession,
-              paths = files,
-              userSpecifiedSchema = Some(sourceInfo.schema),
-              className = className,
-              options = new CaseInsensitiveMap(newOptions))
-          Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
-        }
-
         new FileStreamSource(
-          sparkSession, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
+          sparkSession, path, className, sourceInfo.schema, metadataPath, options)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")

http://git-wip-us.apache.org/repos/asf/spark/blob/d8c2da9a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index bdf43e0..5cee2b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -53,6 +53,7 @@ class ListingFileCatalog(
     if (cachedPartitionSpec == null) {
       cachedPartitionSpec = inferPartitioning()
     }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
     cachedPartitionSpec
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8c2da9a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5f04a6c..27f23c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -49,7 +49,7 @@ abstract class PartitioningAwareFileCatalog(
   protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
-    if (partitionSpec().partitionColumns.isEmpty) {
+    val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_"))
:: Nil
     } else {
       prunePartitions(filters, partitionSpec()).map {
@@ -59,6 +59,8 @@ abstract class PartitioningAwareFileCatalog(
             leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
       }
     }
+    logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+    selectedPartitions
   }
 
   override def allFiles(): Seq[FileStatus] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d8c2da9a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 7b4c035..bef5616 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog,
LogicalRelation}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.OpenHashSet
 
 /**
@@ -33,12 +35,14 @@ import org.apache.spark.util.collection.OpenHashSet
  */
 class FileStreamSource(
     sparkSession: SparkSession,
-    metadataPath: String,
     path: String,
+    fileFormatClassName: String,
     override val schema: StructType,
-    dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
+    metadataPath: String,
+    options: Map[String, String]) extends Source with Logging {
 
   private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+  private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
   private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
@@ -69,6 +73,7 @@ class FileStreamSource(
     if (newFiles.nonEmpty) {
       maxBatchId += 1
       metadataLog.add(maxBatchId, newFiles)
+      logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
     }
 
     new LongOffset(maxBatchId)
@@ -97,21 +102,30 @@ class FileStreamSource(
     assert(startId <= endId)
     val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
     logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
-    logDebug(s"Streaming ${files.mkString(", ")}")
-    dataFrameBuilder(files)
+    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
+    val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
+    val newDataSource =
+      DataSource(
+        sparkSession,
+        paths = files,
+        userSpecifiedSchema = Some(schema),
+        className = fileFormatClassName,
+        options = newOptions)
+    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
   }
 
   private def fetchAllFiles(): Seq[String] = {
-    val startTime = System.nanoTime()
-    val files = fs.listStatus(new Path(path))
-      .filterNot(_.getPath.getName.startsWith("_"))
-      .map(_.getPath.toUri.toString)
-    val endTime = System.nanoTime()
-    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+    val startTime = System.nanoTime
+    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+    val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+    val files = catalog.allFiles().map(_.getPath.toUri.toString)
+    val endTime = System.nanoTime
+    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
     files
   }
 
   override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
 
-  override def toString: String = s"FileSource[$path]"
+  override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d8c2da9a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 4b95d65..c97304c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class FileStreamSourceTest extends StreamTest with SharedSQLContext {
@@ -58,7 +58,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
         addData(source)
         source.currentOffset + 1
       }
-      logInfo(s"Added data to $source at offset $newOffset")
+      logInfo(s"Added file to $source at offset $newOffset")
       (source, newOffset)
     }
 
@@ -69,8 +69,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
     extends AddFileData {
 
     override def addData(source: FileStreamSource): Unit = {
-      val file = Utils.tempFileWith(new File(tmp, "text"))
-      stringToFile(file, content).renameTo(new File(src, file.getName))
+      val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+      val finalFile = new File(src, tempFile.getName)
+      src.mkdirs()
+      require(stringToFile(tempFile, content).renameTo(finalFile))
+      logInfo(s"Written text '$content' to file $finalFile")
     }
   }
 
@@ -89,6 +92,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
     def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
       val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
       df.write.parquet(tmpDir.getCanonicalPath)
+      src.mkdirs()
       tmpDir.listFiles().foreach { f =>
         f.renameTo(new File(src, s"${f.getName}"))
       }
@@ -100,7 +104,6 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
       format: String,
       path: String,
       schema: Option[StructType] = None): DataFrame = {
-
     val reader =
       if (schema.isDefined) {
         spark.read.format(format).schema(schema.get)
@@ -327,7 +330,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
-
   test("reading from json files inside partitioned directory") {
     withTempDirs { case (baseSrc, tmp) =>
       val src = new File(baseSrc, "type=X")
@@ -348,7 +350,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
-
   test("reading from json files with changing schema") {
     withTempDirs { case (src, tmp) =>
 
@@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  test("read new files in nested directories with globbing") {
+    withTempDirs { case (dir, tmp) =>
+
+      // src/*/* should consider all the files and directories that matches that glob.
+      // So any files that matches the glob as well as any files in directories that matches
+      // this glob should be read.
+      val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*")
+      val filtered = fileStream.filter($"value" contains "keep")
+      val subDir = new File(dir, "subdir")
+      val subSubDir = new File(subDir, "subsubdir")
+      val subSubSubDir = new File(subSubDir, "subsubsubdir")
+
+      require(!subDir.exists())
+      require(!subSubDir.exists())
+
+      testStream(filtered)(
+        // Create new dir/subdir and write to it, should read
+        AddTextFileData("drop1\nkeep2", subDir, tmp),
+        CheckAnswer("keep2"),
+
+        // Add files to dir/subdir, should read
+        AddTextFileData("keep3", subDir, tmp),
+        CheckAnswer("keep2", "keep3"),
+
+        // Create new dir/subdir/subsubdir and write to it, should read
+        AddTextFileData("keep4", subSubDir, tmp),
+        CheckAnswer("keep2", "keep3", "keep4"),
+
+        // Add files to dir/subdir/subsubdir, should read
+        AddTextFileData("keep5", subSubDir, tmp),
+        CheckAnswer("keep2", "keep3", "keep4", "keep5"),
+
+        // 1. Add file to src dir, should not read as globbing src/*/* does not capture files
in
+        //    dir, only captures files in dir/subdir/
+        // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read as src/*/*
should
+        //    not capture those files
+        AddTextFileData("keep6", dir, tmp),
+        AddTextFileData("keep7", subSubSubDir, tmp),
+        AddTextFileData("keep8", subDir, tmp), // needed to make query detect new data
+        CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
+      )
+    }
+  }
+
+  test("read new files in partitioned table with globbing, should not read partition data")
{
+    withTempDirs { case (dir, tmp) =>
+      val partitionFooSubDir = new File(dir, "partition=foo")
+      val partitionBarSubDir = new File(dir, "partition=bar")
+
+      val schema = new StructType().add("value", StringType).add("partition", StringType)
+      val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/*/*", Some(schema))
+      val filtered = fileStream.filter($"value" contains "keep")
+      val nullStr = null.asInstanceOf[String]
+      testStream(filtered)(
+        // Create new partition=foo sub dir and write to it, should read only value, not
partition
+        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", nullStr)),
+
+        // Append to same partition=1 sub dir, should read only value, not partition
+        AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr)),
+
+        // Create new partition sub dir and write to it, should read only value, not partition
+        AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", nullStr)),
+
+        // Append to same partition=2 sub dir, should read only value, not partition
+        AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", nullStr), ("keep5",
nullStr))
+      )
+    }
+  }
+
   test("fault tolerance") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("text", src.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message