spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old file...
Date Wed, 31 Dec 2014 22:35:13 GMT
Repository: spark
Updated Branches:
  refs/heads/master c88a3d7fc -> 3610d3c61


[SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old file...

...s to get deleted before continuing.

Since the deletes are happening asynchronously, the getFileStatus call might throw an exception
in older HDFS
versions, if the delete happens between the time listFiles is called on the directory and
getFileStatus is called
on the file in the getFileStatus method.

This PR addresses this by adding an option to delete the files synchronously and then waiting
for the deletion to
complete before proceeding.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #3726 from harishreedharan/spark-4790 and squashes the following commits:

bbbacd1 [Hari Shreedharan] Call cleanUpOldLogs only once in the tests.
3255f17 [Hari Shreedharan] Add test for async deletion. Remove method from ReceiverTracker
that does not take waitForCompletion.
e4c83ec [Hari Shreedharan] Making waitForCompletion a mandatory param. Remove eventually from
WALSuite since the cleanup method returns only after all files are deleted.
af00fd1 [Hari Shreedharan] [SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for
old files to get deleted before continuing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3610d3c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3610d3c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3610d3c6

Branch: refs/heads/master
Commit: 3610d3c615112faef98d94f04efaea602cc4aa8f
Parents: c88a3d7
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Dec 31 14:35:07 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Dec 31 14:35:07 2014 -0800

----------------------------------------------------------------------
 .../streaming/receiver/ReceivedBlockHandler.scala |  8 ++++----
 .../scheduler/ReceivedBlockTracker.scala          |  9 ++++++---
 .../streaming/scheduler/ReceiverTracker.scala     |  2 +-
 .../streaming/util/WriteAheadLogManager.scala     | 17 +++++++++++++----
 .../streaming/ReceivedBlockHandlerSuite.scala     |  2 +-
 .../streaming/ReceivedBlockTrackerSuite.scala     |  2 +-
 .../spark/streaming/util/WriteAheadLogSuite.scala | 18 ++++++++++++++++--
 7 files changed, 42 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 8b97db8..f7a8ebe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler {
   def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
 
   /** Cleanup old blocks older than the given threshold time */
-  def cleanupOldBlock(threshTime: Long)
+  def cleanupOldBlocks(threshTime: Long)
 }
 
 
@@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
     BlockManagerBasedStoreResult(blockId)
   }
 
-  def cleanupOldBlock(threshTime: Long) {
+  def cleanupOldBlocks(threshTime: Long) {
     // this is not used as blocks inserted into the BlockManager are cleared by DStream's
clearing
     // of BlockRDDs.
   }
@@ -192,8 +192,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     WriteAheadLogBasedStoreResult(blockId, segment)
   }
 
-  def cleanupOldBlock(threshTime: Long) {
-    logManager.cleanupOldLogs(threshTime)
+  def cleanupOldBlocks(threshTime: Long) {
+    logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
   }
 
   def stop() {

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 02758e0..2ce458c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -139,14 +139,17 @@ private[streaming] class ReceivedBlockTracker(
     getReceivedBlockQueue(streamId).toSeq
   }
 
-  /** Clean up block information of old batches. */
-  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
+  /**
+   * Clean up block information of old batches. If waitForCompletion is true, this method
+   * returns only after the files are cleaned up.
+   */
+  def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized
{
     assert(cleanupThreshTime.milliseconds < clock.currentTime())
     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
     logInfo("Deleting batches " + timesToCleanup)
     writeToLog(BatchCleanupEvent(timesToCleanup))
     timeToAllocatedBlocks --= timesToCleanup
-    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
+    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
     log
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1f0e442..8dbb42a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
 
     /** Clean up metadata older than the given threshold time */
   def cleanupOldMetadata(cleanupThreshTime: Time) {
-    receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
+    receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
   }
 
   /** Register a receiver */

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 70d2343..166661b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.util
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 import WriteAheadLogManager._
@@ -124,8 +124,12 @@ private[streaming] class WriteAheadLogManager(
    * files, which is usually based on the local system time. So if there is coordination
necessary
    * between the node calculating the threshTime (say, driver node), and the local system
time
    * (say, worker node), the caller has to take account of possible time skew.
+   *
+   * If waitForCompletion is set to true, this method will return only after old logs have
been
+   * deleted. This should be set to true only for testing. Else the files will be deleted
+   * asynchronously.
    */
-  def cleanupOldLogs(threshTime: Long): Unit = {
+  def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
     val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
     logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
       s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
@@ -146,10 +150,15 @@ private[streaming] class WriteAheadLogManager(
       logInfo(s"Cleared log files in $logDirectory older than $threshTime")
     }
     if (!executionContext.isShutdown) {
-      Future { deleteFiles() }
+      val f = Future { deleteFiles() }
+      if (waitForCompletion) {
+        import scala.concurrent.duration._
+        Await.ready(f, 1 second)
+      }
     }
   }
 
+
   /** Stop the manager, close any open log writer */
   def stop(): Unit = synchronized {
     if (currentLogWriter != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3661e16..132ff24 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with
Matche
       manualClock.currentTime() shouldEqual 5000L
 
       val cleanupThreshTime = 3000L
-      handler.cleanupOldBlock(cleanupThreshTime)
+      handler.cleanupOldBlocks(cleanupThreshTime)
       eventually(timeout(10000 millis), interval(10 millis)) {
         getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 01a09b6..de7e9d6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite
     // Cleanup first batch but not second batch
     val oldestLogFile = getWriteAheadLogFiles().head
     incrementTime()
-    tracker3.cleanupOldBatches(batchTime2)
+    tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true)
 
     // Verify that the batch allocations have been cleaned, and the act has been written
to log
     tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/3610d3c6/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8f69bcb..7ce9499 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -182,15 +182,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("WriteAheadLogManager - cleanup old logs") {
+    logCleanUpTest(waitForCompletion = false)
+  }
+
+  test("WriteAheadLogManager - cleanup old logs synchronously") {
+    logCleanUpTest(waitForCompletion = true)
+  }
+
+  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
     // Write data with manager, recover with new manager and verify
     val manualClock = new ManualClock
     val dataToWrite = generateRandomData()
     manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
     val logFiles = getLogFilesInDirectory(testDir)
     assert(logFiles.size > 1)
-    manager.cleanupOldLogs(manualClock.currentTime() / 2)
-    eventually(timeout(1 second), interval(10 milliseconds)) {
+
+    manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+
+    if (waitForCompletion) {
       assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+    } else {
+      eventually(timeout(1 second), interval(10 milliseconds)) {
+        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message