spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite
Date Tue, 18 Oct 2016 07:50:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1c5a7d7f6 -> 7d878cf2d


[SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite

This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of
it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without
the stream execution thread being unblocked and doing some work between the two attempts.
Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be
disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be
disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in
any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting
for the clock at the right wait start time. That is, `advance(200)` (see above) will wait
for stream execution thread to complete the wait that started at time 0, and start a new wait
at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all
the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes #15519 from tdas/metrics-flaky-test-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d878cf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d878cf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d878cf2

Branch: refs/heads/master
Commit: 7d878cf2da04800bc4147b05610170865b148c64
Parents: 1c5a7d7
Author: Liwei Lin <lwlin7@gmail.com>
Authored: Tue Oct 18 00:49:57 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Oct 18 00:49:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ManualClock.scala     | 18 ++--------
 .../spark/sql/streaming/StreamSuite.scala       |  4 +--
 .../apache/spark/sql/streaming/StreamTest.scala | 38 ++++++++++++++++----
 .../streaming/StreamingQueryListenerSuite.scala |  8 ++---
 4 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/core/src/main/scala/org/apache/spark/util/ManualClock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
index 91a9587..e7a65d7 100644
--- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -26,8 +26,6 @@ package org.apache.spark.util
  */
 private[spark] class ManualClock(private var time: Long) extends Clock {
 
-  private var _isWaiting = false
-
   /**
    * @return `ManualClock` with initial time 0
    */
@@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock
{
    * @return current time reported by the clock when waiting finishes
    */
   def waitTillTime(targetTime: Long): Long = synchronized {
-    _isWaiting = true
-    try {
-      while (time < targetTime) {
-        wait(10)
-      }
-      getTimeMillis()
-    } finally {
-      _isWaiting = false
+    while (time < targetTime) {
+      wait(10)
     }
+    getTimeMillis()
   }
-
-  /**
-   * Returns whether there is any thread being blocked in `waitTillTime`.
-   */
-  def isWaiting: Boolean = synchronized { _isWaiting }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index cdbad90..6bdf479 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -161,7 +161,7 @@ class StreamSuite extends StreamTest {
 
     val inputData = MemoryStream[Int]
     testStream(inputData.toDS())(
-      StartStream(ProcessingTime("10 seconds"), new ManualClock),
+      StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
 
       /* -- batch 0 ----------------------- */
       // Add some data in batch 0
@@ -199,7 +199,7 @@ class StreamSuite extends StreamTest {
 
       /* Stop then restart the Stream  */
       StopStream,
-      StartStream(ProcessingTime("10 seconds"), new ManualClock),
+      StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),
 
       /* -- batch 1 rerun ----------------- */
       // this batch 1 would re-run because the latest batch id logged in offset log is 1

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 3b9d378..254f823 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts
{
   case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
     extends StreamAction
 
+  class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+    private var waitStartTime: Option[Long] = None
+
+    override def waitTillTime(targetTime: Long): Long = synchronized {
+      try {
+        waitStartTime = Some(getTimeMillis())
+        super.waitTillTime(targetTime)
+      } finally {
+        waitStartTime = None
+      }
+    }
+
+    def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time)
}
+  }
+
 
   /**
    * Executes the specified actions on the given streaming DataFrame and provides helpful
@@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts
{
     val testThread = Thread.currentThread()
     val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
     val statusCollector = new QueryStatusCollector
-
+    var manualClockExpectedTime = -1L
     try {
       spark.streams.addListener(statusCollector)
       startedTest.foreach { action =>
@@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts
{
         action match {
           case StartStream(trigger, triggerClock) =>
             verify(currentStream == null, "stream already running")
+            verify(triggerClock.isInstanceOf[SystemClock]
+              || triggerClock.isInstanceOf[StreamManualClock],
+              "Use either SystemClock or StreamManualClock to start the stream")
+            if (triggerClock.isInstanceOf[StreamManualClock]) {
+              manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
+            }
             lastStream = currentStream
             currentStream =
               spark
@@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts
{
           case AdvanceManualClock(timeToAdd) =>
             verify(currentStream != null,
                    "can not advance manual clock when a stream is not running")
-            verify(currentStream.triggerClock.isInstanceOf[ManualClock],
+            verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
                    s"can not advance clock of type ${currentStream.triggerClock.getClass}")
-            val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
+            val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
+            assert(manualClockExpectedTime >= 0)
             // Make sure we don't advance ManualClock too early. See SPARK-16002.
-            eventually("ManualClock has not yet entered the waiting state") {
-              assert(clock.isWaiting)
+            eventually("StreamManualClock has not yet entered the waiting state") {
+              assert(clock.isStreamWaitingAt(manualClockExpectedTime))
             }
-            currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
+            clock.advance(timeToAdd)
+            manualClockExpectedTime += timeToAdd
+            verify(clock.getTimeMillis() === manualClockExpectedTime,
+              s"Unexpected clock time after updating: " +
+                s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}")
 
           case StopStream =>
             verify(currentStream != null, "can not stop a stream that is not running")

http://git-wip-us.apache.org/repos/asf/spark/blob/7d878cf2/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 9e0eefb..623f66a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
     // Make sure we don't leak any events to the next test
   }
 
-  ignore("single listener, check trigger statuses") {
+  test("single listener, check trigger statuses") {
     import StreamingQueryListenerSuite._
-    clock = new ManualClock()
+    clock = new StreamManualClock
 
     /** Custom MemoryStream that waits for manual clock to reach a time */
     val inputData = new MemoryStream[Int](0, sqlContext) {
@@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
       AssertOnLastQueryStatus { status: StreamingQueryStatus =>
         // Check the correctness of the trigger info of the last completed batch reported
by
         // onQueryProgress
-        assert(status.triggerDetails.get("triggerId") == "0")
+        assert(status.triggerDetails.containsKey("triggerId"))
         assert(status.triggerDetails.get("isTriggerActive") === "false")
         assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
 
@@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
         assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
 
         assert(status.sourceStatuses.length === 1)
-        assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0")
+        assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
         assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") ===
"100")
         assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") ===
"200")
         assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message