spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger...
Date Thu, 15 Dec 2016 23:46:13 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a7364a82e -> 08e427287


[SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger...

## What changes were proposed in this pull request?

Use `recentProgress` instead of `lastProgress` and filter out last non-zero value. Also add
eventually to the latest assertQuery similar to first `assertQuery`

## How was this patch tested?

Ran test 1000 times

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16287 from brkyvz/SPARK-18868.

(cherry picked from commit 9c7f83b0289ba4550b156e6af31cf7c44580eb12)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e42728
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e42728
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e42728

Branch: refs/heads/branch-2.1
Commit: 08e4272872fc17c43f0dc79d329b946e8e85694d
Parents: a7364a8
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Thu Dec 15 15:46:03 2016 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Thu Dec 15 15:46:10 2016 -0800

----------------------------------------------------------------------
 .../streaming/StreamingQueryListenerSuite.scala | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08e42728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 7c6745ac..a057d1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
         CheckAnswer(10, 5),
         AssertOnQuery { query =>
           assert(listener.progressEvents.nonEmpty)
-          assert(listener.progressEvents.last.json === query.lastProgress.json)
+          // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we
filter
+          // out non-zero input rows, but the lastProgress may be a zero input row trigger
+          val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption
+            .getOrElse(fail("No progress updates received in StreamingQuery!"))
+          assert(listener.progressEvents.last.json === lastNonZeroProgress.json)
           assert(listener.terminationEvent === null)
           true
         },
@@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
         AdvanceManualClock(100),
         ExpectFailure[SparkException],
         AssertOnQuery { query =>
-          assert(listener.terminationEvent !== null)
-          assert(listener.terminationEvent.id === query.id)
-          assert(listener.terminationEvent.exception.nonEmpty)
-          // Make sure that the exception message reported through listener
-          // contains the actual exception and relevant stack trace
-          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
-          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
+          eventually(Timeout(streamingTimeout)) {
+            assert(listener.terminationEvent !== null)
+            assert(listener.terminationEvent.id === query.id)
+            assert(listener.terminationEvent.exception.nonEmpty)
+            // Make sure that the exception message reported through listener
+            // contains the actual exception and relevant stack trace
+            assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+            assert(
+              listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+            assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
+          }
           listener.checkAsyncErrors()
           true
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message