spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-23788][SS] Fix race in StreamingQuerySuite
Date Sun, 25 Mar 2018 01:21:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master a33655348 -> 816a5496b


[SPARK-23788][SS] Fix race in StreamingQuerySuite

## What changes were proposed in this pull request?

The serializability test uses the same MemoryStream instance for 3 different queries. If any
of those queries ask it to commit before the others have run, the rest will see empty dataframes.
This can fail the test if q3 is affected.

We should use one instance per query instead.

## How was this patch tested?

Existing unit test. If I move q2.processAllAvailable() before starting q3, the test always
fails without the fix.

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #20896 from jose-torres/fixrace.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/816a5496
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/816a5496
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/816a5496

Branch: refs/heads/master
Commit: 816a5496ba4caac438f70400f72bb10bfcc02418
Parents: a336553
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Authored: Sat Mar 24 18:21:01 2018 -0700
Committer: Shixiong Zhu <zsxwing@gmail.com>
Committed: Sat Mar 24 18:21:01 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/816a5496/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index ebc9a87..08749b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -550,22 +550,22 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with
Logging wi
         .start()
     }
 
-    val input = MemoryStream[Int]
-    val q1 = startQuery(input.toDS, "stream_serializable_test_1")
-    val q2 = startQuery(input.toDS.map { i =>
+    val input = MemoryStream[Int] :: MemoryStream[Int] :: MemoryStream[Int] :: Nil
+    val q1 = startQuery(input(0).toDS, "stream_serializable_test_1")
+    val q2 = startQuery(input(1).toDS.map { i =>
       // Emulate that `StreamingQuery` get captured with normal usage unintentionally.
       // It should not fail the query.
       q1
       i
     }, "stream_serializable_test_2")
-    val q3 = startQuery(input.toDS.map { i =>
+    val q3 = startQuery(input(2).toDS.map { i =>
       // Emulate that `StreamingQuery` is used in executors. We should fail the query with
a clear
       // error message.
       q1.explain()
       i
     }, "stream_serializable_test_3")
     try {
-      input.addData(1)
+      input.foreach(_.addData(1))
 
       // q2 should not fail since it doesn't use `q1` in the closure
       q2.processAllAvailable()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message