spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
Date Sun, 26 Feb 2017 23:57:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master 68f2142cf -> 9f8e39215


[SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent
if more then one listeners exists

## What changes were proposed in this pull request?

currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered,
only one of the listeners will be invoked while the rest of the listeners will ignore the
event.
this is caused since the the streaming queries listeners bus holds a set of running queries
ids and when a termination event is triggered, after the first listeners is handling the event,
the terminated query id is being removed from the set.
in this PR, the query id will be removed from the set only after all the listeners handles
the event

## How was this patch tested?

a test with multiple listeners has been added to StreamingQueryListenerSuite

Author: Eyal Zituny <eyal.zituny@equalum.io>

Closes #16991 from eyalzit/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f8e3921
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f8e3921
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f8e3921

Branch: refs/heads/master
Commit: 9f8e392159ba65decddf62eb3cd85b6821db01b4
Parents: 68f2142
Author: Eyal Zituny <eyal.zituny@equalum.io>
Authored: Sun Feb 26 15:57:32 2017 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Sun Feb 26 15:57:32 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/ListenerBus.scala     |  2 +-
 .../streaming/StreamingQueryListenerBus.scala   | 14 ++++++++++-
 .../streaming/StreamingQueryListenerSuite.scala | 25 ++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f8e3921/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 79fc2e9..fa5ad4e 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -52,7 +52,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
    * Post the event to all registered listeners. The `postToAll` caller should guarantee
calling
    * `postToAll` in the same thread for all events.
    */
-  final def postToAll(event: E): Unit = {
+  def postToAll(event: E): Unit = {
     // JavaConverters can create a JIterableWrapper if we use asScala.
     // However, this method will be called frequently. To avoid the wrapper cost, here we
use
     // Java Iterator directly.

http://git-wip-us.apache.org/repos/asf/spark/blob/9f8e3921/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index a2153d2..4207013 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -75,6 +75,19 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
     }
   }
 
+  /**
+   * Override the parent `postToAll` to remove the query id from `activeQueryRunIds` after
all
+   * the listeners process `QueryTerminatedEvent`. (SPARK-19594)
+   */
+  override def postToAll(event: Event): Unit = {
+    super.postToAll(event)
+    event match {
+      case t: QueryTerminatedEvent =>
+        activeQueryRunIds.synchronized { activeQueryRunIds -= t.runId }
+      case _ =>
+    }
+  }
+
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
       case e: StreamingQueryListener.Event =>
@@ -112,7 +125,6 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
       case queryTerminated: QueryTerminatedEvent =>
         if (shouldReport(queryTerminated.runId)) {
           listener.onQueryTerminated(queryTerminated)
-          activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
         }
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f8e3921/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4596aa1..eb09b9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter
{
     }
   }
 
+  test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
+    val df = MemoryStream[Int].toDS().as[Long]
+    val listeners = (1 to 5).map(_ => new EventCollector)
+    try {
+      listeners.foreach(listener => spark.streams.addListener(listener))
+      testStream(df, OutputMode.Append)(
+        StartStream(),
+        StopStream,
+        AssertOnQuery { query =>
+          eventually(Timeout(streamingTimeout)) {
+            listeners.foreach(listener => assert(listener.terminationEvent !== null))
+            listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
+            listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
+            listeners.foreach(listener => assert(listener.terminationEvent.exception ===
None))
+          }
+          listeners.foreach(listener => listener.checkAsyncErrors())
+          listeners.foreach(listener => listener.reset())
+          true
+        }
+      )
+    } finally {
+      listeners.foreach(spark.streams.removeListener)
+    }
+  }
+
   test("adding and removing listener") {
     def isListenerActive(listener: EventCollector): Boolean = {
       listener.reset()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message