spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-17649][CORE] Log how many Spark events got dropped in AsynchronousListenerBus (branch 1.6)
Date Mon, 26 Sep 2016 18:03:09 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 94524cef4 -> 7aded55e7


[SPARK-17649][CORE] Log how many Spark events got dropped in AsynchronousListenerBus (branch
1.6)

## What changes were proposed in this pull request?

Backport #15220 to 1.6.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15226 from zsxwing/SPARK-17649-branch-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aded55e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aded55e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aded55e

Branch: refs/heads/branch-1.6
Commit: 7aded55e7329d331728ce2ec24d8ec915204d775
Parents: 94524ce
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Mon Sep 26 11:03:05 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Mon Sep 26 11:03:05 2016 -0700

----------------------------------------------------------------------
 .../spark/util/AsynchronousListenerBus.scala    | 27 +++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7aded55e/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 6c1fca7..b5455ff 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.util
 
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
 import scala.util.DynamicVariable
 
 import org.apache.spark.SparkContext
@@ -51,6 +52,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name:
Stri
   // Indicate if `stop()` is called
   private val stopped = new AtomicBoolean(false)
 
+  /** A counter for dropped events. It will be reset every time we log it. */
+  private val droppedEventsCounter = new AtomicLong(0L)
+
+  /** When `droppedEventsCounter` was logged last time in milliseconds. */
+  @volatile private var lastReportTimestamp = 0L
+
   // Indicate if we are processing some event
   // Guarded by `self`
   private var processingEvent = false
@@ -117,6 +124,24 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef,
E](name: Stri
       eventLock.release()
     } else {
       onDropEvent(event)
+      droppedEventsCounter.incrementAndGet()
+    }
+
+    val droppedEvents = droppedEventsCounter.get
+    if (droppedEvents > 0) {
+      // Don't log too frequently
+      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
+        // There may be multiple threads trying to decrease droppedEventsCounter.
+        // Use "compareAndSet" to make sure only one thread can win.
+        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will
fail and
+        // then that thread will update it.
+        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
+          val prevLastReportTimestamp = lastReportTimestamp
+          lastReportTimestamp = System.currentTimeMillis()
+          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
+            new java.util.Date(prevLastReportTimestamp))
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message