spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-22094][SS] processAllAvailable should check the query state
Date Fri, 22 Sep 2017 05:08:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 765fd92e7 -> 090b987e6


[SPARK-22094][SS] processAllAvailable should check the query state

`processAllAvailable` should also check the query state and if the query is stopped, it should
return.

The new unit test.

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19314 from zsxwing/SPARK-22094.

(cherry picked from commit fedf6961be4e99139eb7ab08d5e6e29187ea5ccf)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/090b987e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/090b987e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/090b987e

Branch: refs/heads/branch-2.2
Commit: 090b987e665a47f08e2dc9fc5f22c427bc260fbc
Parents: 765fd92
Author: Shixiong Zhu <zsxwing@gmail.com>
Authored: Thu Sep 21 21:55:07 2017 -0700
Committer: Shixiong Zhu <zsxwing@gmail.com>
Committed: Thu Sep 21 22:08:45 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/StreamExecution.scala |  2 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala       | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 16db353..33f81d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -777,7 +777,7 @@ class StreamExecution(
         if (streamDeathCause != null) {
           throw streamDeathCause
         }
-        if (noNewData) {
+        if (noNewData || !isActive) {
           return
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b69536e..ee5af65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with
Logging wi
     }
   }
 
+  test("processAllAvailable should not block forever when a query is stopped") {
+    val input = MemoryStream[Int]
+    input.addData(1)
+    val query = input.toDF().writeStream
+      .trigger(Trigger.Once())
+      .format("console")
+      .start()
+    failAfter(streamingTimeout) {
+      query.processAllAvailable()
+    }
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns the given static
DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
     require(!triggerDF.isStreaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message