spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
Date Wed, 21 Dec 2016 19:17:47 GMT
Repository: spark
Updated Branches:
  refs/heads/master 607a1e63d -> ccfe60a83


[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality

## What changes were proposed in this pull request?

The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation`
is called. This PR just adds StreamManualClock to allow the tests to block on expected wait
time to make the test deterministic.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16321 from zsxwing/SPARK-18031.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccfe60a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccfe60a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccfe60a8

Branch: refs/heads/master
Commit: ccfe60a8304871779ff1b31b8c2d724f59d5b2af
Parents: 607a1e6
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Wed Dec 21 11:17:44 2016 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Dec 21 11:17:44 2016 -0800

----------------------------------------------------------------------
 .../ExecutorAllocationManagerSuite.scala        | 36 +++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccfe60a8/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579..1d2bf35 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
-  private var clock: ManualClock = null
+  private var clock: StreamManualClock = null
 
   before {
     allocationClient = mock[ExecutorAllocationClient]
-    clock = new ManualClock()
+    clock = new StreamManualClock()
   }
 
   test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
         reset(allocationClient)
         when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
         addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-        clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+        val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+        val expectedWaitTime = clock.getTimeMillis() + advancedTime
+        clock.advance(advancedTime)
+        // Make sure ExecutorAllocationManager.manageAllocation is called
         eventually(timeout(10 seconds)) {
-          body
+          assert(clock.isStreamWaitingAt(expectedWaitTime))
         }
+        body
       }
 
       /** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
     }
   }
 }
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if
the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
+  private var waitStartTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+    try {
+      waitStartTime = Some(getTimeMillis())
+      super.waitTillTime(targetTime)
+    } finally {
+      waitStartTime = None
+    }
+  }
+
+  /**
+   * Returns if the clock is blocking and the time it started to block is the parameter `time`.
+   */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+    waitStartTime == Some(time)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message