spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to save file names in FileStreamSource
Date Wed, 07 Sep 2016 02:34:36 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a23d4065c -> 796577b43


[SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to save file names
in FileStreamSource

## What changes were proposed in this pull request?

When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then
ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is
a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93),
where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79)
filters this Stream[String] to remove the seen files, it creates a new Stream[String], which
has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10).
Trying to serialize this Stream[String] causes NotSerializableException. This will happened
even if there is just one file in the dir.

Its important to note that this behavior is different in Scala 2.11. There is no $outer reference
to FileStreamSource, so it does not throw NotSerializableException. However, with a large
sequence of files (tested with 10000 files), it throws StackOverflowError. This is because
how Stream class is implemented. Its basically like a linked list, and attempting to serialize
a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError.

In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions
are true.
- file stream defined on a partitioned directory
- directory has 10k+ files

The right solution is to convert the seq to an array before writing to the log. This PR implements
this fix in two ways.
- Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
- Added a `require` in HDFSMetadataLog such that it is never used with type Seq

## How was this patch tested?

Added unit test that test that ensures the file stream source can handle with 10000 files.
This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14987 from tdas/SPARK-17372.

(cherry picked from commit eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/796577b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/796577b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/796577b4

Branch: refs/heads/branch-2.0
Commit: 796577b43d3df94f5d3a8e4baeb0aa03fbbb3f21
Parents: a23d406
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Sep 6 19:34:11 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Sep 6 19:34:32 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSinkLog.scala | 12 +++---
 .../execution/streaming/FileStreamSource.scala  |  4 +-
 .../execution/streaming/HDFSMetadataLog.scala   |  4 ++
 .../execution/streaming/StreamExecution.scala   |  3 ++
 .../streaming/FileStreamSinkLogSuite.scala      | 18 ++++-----
 .../sql/streaming/FileStreamSourceSuite.scala   | 42 +++++++++++++++++++-
 6 files changed, 65 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 4254df4..7520163 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -80,7 +80,7 @@ object SinkFileStatus {
  * (drops the deleted files).
  */
 class FileStreamSinkLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) {
+  extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
 
   import FileStreamSinkLog._
 
@@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
     }
   }
 
-  override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
+  override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
     (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
   }
 
-  override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
+  override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
     val lines = new String(bytes, UTF_8).split("\n")
     if (lines.length == 0) {
       throw new IllegalStateException("Incomplete log file")
@@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
     if (version != VERSION) {
       throw new IllegalStateException(s"Unknown log version: ${version}")
     }
-    lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
+    lines.slice(1, lines.length).map(read[SinkFileStatus](_))
   }
 
-  override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
+  override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
     if (isCompactionBatch(batchId, compactInterval)) {
       compact(batchId, logs)
     } else {
@@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
   private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
     val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
     val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
-    if (super.add(batchId, compactLogs(allLogs))) {
+    if (super.add(batchId, compactLogs(allLogs).toArray)) {
       if (isDeletingExpiredLog) {
         deleteExpiredLog(batchId)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index e8b969b..42fb454 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -49,7 +49,7 @@ class FileStreamSource(
     fs.makeQualified(new Path(path))  // can contains glob patterns
   }
 
-  private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
+  private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)
 
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
@@ -98,7 +98,7 @@ class FileStreamSource(
 
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, batchFiles)
+      metadataLog.add(maxBatchId, batchFiles.toArray)
       logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 127ece9..39a0f33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread
 class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
   extends MetadataLog[T] with Logging {
 
+  // Avoid serializing generic sequences, see SPARK-17372
+  require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
+    "Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
+
   import HDFSMetadataLog._
 
   val metadataPath = new Path(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4d05af0..5e1e5ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -407,6 +407,9 @@ class StreamExecution(
       awaitBatchLock.lock()
       try {
         awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+        if (streamDeathCause != null) {
+          throw streamDeathCause
+        }
       } finally {
         awaitBatchLock.unlock()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 39fd1f0..26f8b98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
 
   test("serialize") {
     withFileStreamSinkLog { sinkLog =>
-      val logs = Seq(
+      val logs = Array(
         SinkFileStatus(
           path = "/a/b/x",
           size = 100L,
@@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
       // scalastyle:on
       assert(expected === new String(sinkLog.serialize(logs), UTF_8))
 
-      assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
+      assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
     }
   }
 
@@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
         for (batchId <- 0 to 10) {
           sinkLog.add(
             batchId,
-            Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
+            Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
           val expectedFiles = (0 to batchId).map {
             id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
           }
@@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
           }.toSet
         }
 
-        sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("0") === listBatchFiles())
-        sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("0", "1") === listBatchFiles())
-        sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("2.compact") === listBatchFiles())
-        sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("2.compact", "3") === listBatchFiles())
-        sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("2.compact", "3", "4") === listBatchFiles())
-        sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+        sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("5.compact") === listBatchFiles())
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/796577b4/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 03222b4..886f7be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
-import java.util.UUID
+
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
@@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
+  override val streamingTimeout = 20.seconds
+
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   private def createFileStreamSource(
       format: String,
@@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
     }
   }
+
+  test("SPARK-17372 - write file names to WAL as Array[String]") {
+    // Note: If this test takes longer than the timeout, then its likely that this is actually
+    // running a Spark job with 10000 tasks. This test tries to avoid that by
+    // 1. Setting the threshold for parallel file listing to very high
+    // 2. Using a query that should use constant folding to eliminate reading of the files
+
+    val numFiles = 10000
+
+    // This is to avoid running a spark job to list of files in parallel
+    // by the ListingFileCatalog.
+    spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles
* 2)
+
+    withTempDirs { case (root, tmp) =>
+      val src = new File(root, "a=1")
+      src.mkdirs()
+
+      (1 to numFiles).map { _.toString }.foreach { i =>
+        val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+        val finalFile = new File(src, tempFile.getName)
+        stringToFile(finalFile, i)
+      }
+      assert(src.listFiles().size === numFiles)
+
+      val files = spark.readStream.text(root.getCanonicalPath).as[String]
+
+      // Note this query will use constant folding to eliminate the file scan.
+      // This is to avoid actually running a Spark job with 10000 tasks
+      val df = files.filter("1 == 0").groupBy().count()
+
+      testStream(df, InternalOutputModes.Complete)(
+        AddTextFileData("0", src, tmp),
+        CheckAnswer(0)
+      )
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message