spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-10431] [CORE] Fix intermittent test failure. Wait for event queue to be clear
Date Thu, 03 Sep 2015 20:47:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master 49aff7b9a -> d911c682f


[SPARK-10431] [CORE] Fix intermittent test failure. Wait for event queue to be clear

Author: robbins <robbins@uk.ibm.com>

Closes #8582 from robbinspg/InputOutputMetricsSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d911c682
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d911c682
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d911c682

Branch: refs/heads/master
Commit: d911c682f00cd5c438568c548098e03d3e7ea05c
Parents: 49aff7b
Author: robbins <robbins@uk.ibm.com>
Authored: Thu Sep 3 13:47:22 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Sep 3 13:47:25 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d911c682/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index d3218a5..44eb5a0 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -286,6 +286,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
   private def runAndReturnMetrics(job: => Unit,
       collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
     val taskMetrics = new ArrayBuffer[Long]()
+
+    // Avoid receiving earlier taskEnd events
+    sc.listenerBus.waitUntilEmpty(500)
+
     sc.addSparkListener(new SparkListener() {
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
         collector(taskEnd).foreach(taskMetrics += _)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message