spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Update Java api for setJobGroup with interruptOnCancel
Date Thu, 24 Apr 2014 05:00:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4b2bab1d0 -> d485eecb7


Update Java api for setJobGroup with interruptOnCancel

Also adds a unit test.

Author: Aaron Davidson <aaron@databricks.com>

Closes #522 from aarondav/cancel2 and squashes the following commits:

565c253 [Aaron Davidson] Update Java api for setJobGroup with interruptOnCancel
65b33d8 [Aaron Davidson] Add unit test for Thread interruption on cancellation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d485eecb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d485eecb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d485eecb

Branch: refs/heads/master
Commit: d485eecb7233dd339ae85a6f58f1c0686dd2037d
Parents: 4b2bab1
Author: Aaron Davidson <aaron@databricks.com>
Authored: Wed Apr 23 22:00:22 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Wed Apr 23 22:00:22 2014 -0700

----------------------------------------------------------------------
 .../spark/api/java/JavaSparkContext.scala       | 15 ++++++++
 .../org/apache/spark/JobCancellationSuite.scala | 36 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d485eecb/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index bda9272..8b95cda 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * // In a separate thread:
    * sc.cancelJobGroup("some_job_to_cancel");
    * }}}
+   *
+   * If interruptOnCancel is set to true for the job group, then job cancellation will result
+   * in Thread.interrupt() being called on the job's executor threads. This is useful to
help ensure
+   * that the tasks are actually stopped in a timely manner, but is off by default due to
HDFS-1208,
+   * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+   */
+  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit
=
+    sc.setJobGroup(groupId, description, interruptOnCancel)
+
+  /**
+   * Assigns a group ID to all the jobs started by this thread until the group ID is set
to a
+   * different value or cleared.
+   *
+   * @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
+   *      This method sets interruptOnCancel to false.
    */
   def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d485eecb/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 7a39d1a..16cfdf1 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore
 
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
 import scala.concurrent.future
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -101,18 +101,50 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with
BeforeAndAf
       sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
     }
 
+    // Block until both tasks of job A have started and cancel job A.
+    sem.acquire(2)
+
     sc.clearJobGroup()
     val jobB = sc.parallelize(1 to 100, 2).countAsync()
+    sc.cancelJobGroup("jobA")
+    val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
+    assert(e.getMessage contains "cancel")
+
+    // Once A is cancelled, job B should finish fairly quickly.
+    assert(jobB.get() === 100)
+  }
+
+
+  test("job group with interruption") {
+    sc = new SparkContext("local[2]", "test")
+
+    // Add a listener to release the semaphore once any tasks are launched.
+    val sem = new Semaphore(0)
+    sc.addSparkListener(new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart) {
+        sem.release()
+      }
+    })
+
+    // jobA is the one to be cancelled.
+    val jobA = future {
+      sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
+      sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
+    }
 
     // Block until both tasks of job A have started and cancel job A.
     sem.acquire(2)
+
+    sc.clearJobGroup()
+    val jobB = sc.parallelize(1 to 100, 2).countAsync()
     sc.cancelJobGroup("jobA")
-    val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
+    val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
     assert(e.getMessage contains "cancel")
 
     // Once A is cancelled, job B should finish fairly quickly.
     assert(jobB.get() === 100)
   }
+
 /*
   test("two jobs sharing the same stage") {
     // sem1: make sure cancel is issued after some tasks are launched


Mime
View raw message