spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [21/37] git commit: Support job cancellation in multi-pool scheduler.
Date Tue, 15 Oct 2013 05:27:07 GMT
Support job cancellation in multi-pool scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ddf64f01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ddf64f01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ddf64f01

Branch: refs/heads/master
Commit: ddf64f019fa98010e0a59e6e1559f4e3f8b25b5f
Parents: 3bd2890
Author: Reynold Xin <rxin@apache.org>
Authored: Thu Oct 10 13:20:27 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Oct 10 13:20:27 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/Pool.scala  |  2 +-
 .../spark/scheduler/SchedulableBuilder.scala     | 19 +++++++++++++++++--
 .../scheduler/cluster/ClusterScheduler.scala     |  2 +-
 3 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 9eb8d48..8b33319 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -43,7 +43,7 @@ private[spark] class Pool(
   var runningTasks = 0
 
   var priority = 0
-  var stageId = 0
+  var stageId = -1
   var name = poolName
   var parent: Pool = null
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index c4f555b..a4e8653 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -36,8 +36,23 @@ private[spark] trait SchedulableBuilder {
 
   def addTaskSetManager(manager: Schedulable, properties: Properties)
 
-  def getTaskSetManagers(stageId: Int): Iterable[Schedulable] = {
-    rootPool.schedulableQueue.filter(_.stageId == stageId)
+  /**
+   * Find the TaskSetManager for the given stage. In fair scheduler, this function examines
+   * all the pools to find the TaskSetManager.
+   */
+  def getTaskSetManagers(stageId: Int): Option[TaskSetManager] = {
+    def getTsm(pool: Pool): Option[TaskSetManager] = {
+      pool.schedulableQueue.foreach {
+        case tsm: TaskSetManager =>
+          if (tsm.stageId == stageId) {
+            return Some(tsm)
+          }
+        case pool: Pool =>
+          getTsm(pool)
+      }
+      return None
+    }
+    getTsm(rootPool)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 031d0b1..250dec5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -172,7 +172,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   override def cancelTasks(stageId: Int): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
-    schedulableBuilder.getTaskSetManagers(stageId).foreach { case tsm: TaskSetManager =>
+    schedulableBuilder.getTaskSetManagers(stageId).foreach { tsm =>
       // There are two possible cases here:
       // 1. The task set manager has been created and some tasks have been scheduled.
       //    In this case, send a kill signal to the executors to kill the task.


Mime
View raw message