spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-22764][CORE] Fix flakiness in SparkContextSuite.
Date Wed, 13 Dec 2017 22:06:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master ba0e79f57 -> a83e8e6c2


[SPARK-22764][CORE] Fix flakiness in SparkContextSuite.

Use a semaphore to synchronize the tasks with the listener code
that is trying to cancel the job or stage, so that the listener
won't try to cancel a job or stage that has already finished.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19956 from vanzin/SPARK-22764.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83e8e6c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83e8e6c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83e8e6c

Branch: refs/heads/master
Commit: a83e8e6c223df8b819335cbabbfff9956942f2ad
Parents: ba0e79f
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Wed Dec 13 16:06:16 2017 -0600
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Wed Dec 13 16:06:16 2017 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContextSuite.scala   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a83e8e6c/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 37fcc93..b30bd74 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import java.io.File
 import java.net.{MalformedURLException, URI}
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Semaphore, TimeUnit}
 
 import scala.concurrent.duration._
 
@@ -499,6 +499,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with
Eventu
   test("Cancelling stages/jobs with custom reasons.") {
     sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
     val REASON = "You shall not pass"
+    val slices = 10
 
     val listener = new SparkListener {
       override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
@@ -508,6 +509,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with
Eventu
           }
           sc.cancelStage(taskStart.stageId, REASON)
           SparkContextSuite.cancelStage = false
+          SparkContextSuite.semaphore.release(slices)
         }
       }
 
@@ -518,21 +520,25 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext
with Eventu
           }
           sc.cancelJob(jobStart.jobId, REASON)
           SparkContextSuite.cancelJob = false
+          SparkContextSuite.semaphore.release(slices)
         }
       }
     }
     sc.addSparkListener(listener)
 
     for (cancelWhat <- Seq("stage", "job")) {
+      SparkContextSuite.semaphore.drainPermits()
       SparkContextSuite.isTaskStarted = false
       SparkContextSuite.cancelStage = (cancelWhat == "stage")
       SparkContextSuite.cancelJob = (cancelWhat == "job")
 
       val ex = intercept[SparkException] {
-        sc.range(0, 10000L).mapPartitions { x =>
-          org.apache.spark.SparkContextSuite.isTaskStarted = true
+        sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
+          SparkContextSuite.isTaskStarted = true
+          // Block waiting for the listener to cancel the stage or job.
+          SparkContextSuite.semaphore.acquire()
           x
-        }.cartesian(sc.range(0, 10L))count()
+        }.count()
       }
 
       ex.getCause() match {
@@ -636,4 +642,5 @@ object SparkContextSuite {
   @volatile var isTaskStarted = false
   @volatile var taskKilled = false
   @volatile var taskSucceeded = false
+  val semaphore = new Semaphore(0)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message