spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: SPARK-960 [CORE] [TEST] JobCancellationSuite "two jobs sharing the same stage" is broken
Date Mon, 26 Jan 2015 22:32:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master b38034e87 -> 0497ea51a


SPARK-960 [CORE] [TEST] JobCancellationSuite "two jobs sharing the same stage" is broken

This reenables and fixes this test, after addressing two issues:

- The Semaphore that was intended to be shared locally was being serialized and copied; it's
now a static member in the companion object as in other tests
- Later changes to Spark means that cancelling the first task will not cancel the shared stage
and therefore the second task should succeed

Author: Sean Owen <sowen@cloudera.com>

Closes #4180 from srowen/SPARK-960 and squashes the following commits:

43da66f [Sean Owen] Fix 'two jobs sharing the same stage' test and reenable it: truly share
a Semaphore locally as intended, and update expectation of failure in non-cancelled task


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0497ea51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0497ea51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0497ea51

Branch: refs/heads/master
Commit: 0497ea51ac345f8057d222a18dbbf8eae78f5b92
Parents: b38034e
Author: Sean Owen <sowen@cloudera.com>
Authored: Mon Jan 26 14:32:27 2015 -0800
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Mon Jan 26 14:32:27 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/JobCancellationSuite.scala    | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0497ea51/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 7584ae7..21487bc 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(jobB.get() === 100)
   }
 
-  ignore("two jobs sharing the same stage") {
+  test("two jobs sharing the same stage") {
     // sem1: make sure cancel is issued after some tasks are launched
-    // sem2: make sure the first stage is not finished until cancel is issued
+    // twoJobsSharingStageSemaphore:
+    //   make sure the first stage is not finished until cancel is issued
     val sem1 = new Semaphore(0)
-    val sem2 = new Semaphore(0)
 
     sc = new SparkContext("local[2]", "test")
     sc.addSparkListener(new SparkListener {
@@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
 
     // Create two actions that would share the some stages.
     val rdd = sc.parallelize(1 to 10, 2).map { i =>
-      sem2.acquire()
+      JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
       (i, i)
     }.reduceByKey(_+_)
     val f1 = rdd.collectAsync()
@@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
     future {
       sem1.acquire()
       f1.cancel()
-      sem2.release(10)
+      JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
     }
 
-    // Expect both to fail now.
-    // TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
+    // Expect f1 to fail due to cancellation,
     intercept[SparkException] { f1.get() }
-    intercept[SparkException] { f2.get() }
+    // but f2 should not be affected
+    f2.get()
   }
 
   def testCount() {
@@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
 object JobCancellationSuite {
   val taskStartedSemaphore = new Semaphore(0)
   val taskCancelledSemaphore = new Semaphore(0)
+  val twoJobsSharingStageSemaphore = new Semaphore(0)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message