spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [19/37] git commit: Put the job cancellation handling into the dagscheduler's main event loop.
Date Tue, 15 Oct 2013 05:27:05 GMT
Put the job cancellation handling into the dagscheduler's main event loop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0353f74a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0353f74a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0353f74a

Branch: refs/heads/master
Commit: 0353f74a9a6882f4faa987a70a256786540a8727
Parents: dbae779
Author: Reynold Xin <rxin@apache.org>
Authored: Thu Oct 10 00:28:00 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Oct 10 00:28:00 2013 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 16 +++----
 .../spark/scheduler/DAGSchedulerEvent.scala     |  2 +
 .../apache/spark/scheduler/SparkListener.scala  |  2 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |  4 +-
 .../scheduler/cluster/ClusterScheduler.scala    | 46 ++++++++++++--------
 .../spark/scheduler/local/LocalScheduler.scala  |  2 +-
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 39 ++++++++++++-----
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 8 files changed, 69 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 714f430..93303a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -338,15 +338,7 @@ class DAGScheduler(
    */
   def killJob(jobId: Int): Unit = this.synchronized {
     logInfo("Asked to kill job " + jobId)
-    activeJobs.find(job => job.jobId == jobId).foreach { job =>
-      killStage(job, job.finalStage)
-    }
-
-    def killStage(job: ActiveJob, stage: Stage): Unit = this.synchronized {
-      logDebug("Killing stage %s".format(stage.id))
-      taskSched.killTasks(stage.id)
-      stage.parents.foreach(parentStage => killStage(job, parentStage))
-    }
+    eventQueue.put(JobCancelled(jobId))
   }
 
   /**
@@ -375,6 +367,12 @@ class DAGScheduler(
           submitStage(finalStage)
         }
 
+      case JobCancelled(jobId) =>
+        // Cancel a job: find all the running stages that are linked to this job, and cancel
them.
+        running.find(_.jobId == jobId).foreach { stage =>
+          taskSched.cancelTasks(stage.id)
+        }
+
       case ExecutorGained(execId, host) =>
         handleExecutorGained(execId, host)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 8dd8569..0d4d4ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
     properties: Properties = null)
   extends DAGSchedulerEvent
 
+private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
+
 private[scheduler]
 case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 62b521a..466baf9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -54,7 +54,7 @@ trait SparkListener {
   /**
    * Called when a task starts
    */
-  def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+  def onTaskStart(taskStart: SparkListenerTaskStart) { }
 
   /**
    * Called when a task ends

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index d25b0a5..6a51efe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -45,8 +45,8 @@ private[spark] trait TaskScheduler {
   // Submit a sequence of tasks to run.
   def submitTasks(taskSet: TaskSet): Unit
 
-  // Kill the stage.
-  def killTasks(stageId: Int)
+  // Cancel a stage.
+  def cancelTasks(stageId: Int)
 
   // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
   def setListener(listener: TaskSchedulerListener): Unit

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index be0dabf..031d0b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.lang.{Boolean => JBoolean}
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{TimerTask, Timer}
@@ -171,28 +170,37 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     backend.reviveOffers()
   }
 
-  override def killTasks(stageId: Int): Unit = synchronized {
-    schedulableBuilder.getTaskSetManagers(stageId).foreach { t =>
-      // Notify the executors to kill the tasks.
-      val ts = t.asInstanceOf[TaskSetManager].taskSet
-      val taskIds = taskSetTaskIds(ts.id)
-      taskIds.foreach { tid =>
-        val execId = taskIdToExecutorId(tid)
-        backend.killTask(tid, execId)
+  override def cancelTasks(stageId: Int): Unit = synchronized {
+    logInfo("Cancelling stage " + stageId)
+    schedulableBuilder.getTaskSetManagers(stageId).foreach { case tsm: TaskSetManager =>
+      // There are two possible cases here:
+      // 1. The task set manager has been created and some tasks have been scheduled.
+      //    In this case, send a kill signal to the executors to kill the task.
+      // 2. The task set manager has been created but no tasks has been scheduled. In this
case,
+      //    simply abort the task set.
+      val taskIds = taskSetTaskIds(tsm.taskSet.id)
+      if (taskIds.size > 0) {
+        taskIds.foreach { tid =>
+          val execId = taskIdToExecutorId(tid)
+          backend.killTask(tid, execId)
+        }
+      } else {
+        tsm.error("Stage %d was cancelled before any tasks was launched".format(stageId))
       }
     }
   }
 
-  def taskSetFinished(manager: TaskSetManager) {
-    this.synchronized {
-      if (activeTaskSets.contains(manager.taskSet.id)) {
-        activeTaskSets -= manager.taskSet.id
-        manager.parent.removeSchedulable(manager)
-        logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
-        taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
-        taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
-        taskSetTaskIds.remove(manager.taskSet.id)
-      }
+  def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
+    // Check to see if the given task set has been removed. This is possible in the case
of
+    // multiple unrecoverable task failures (e.g. if the entire task set is killed when it
has
+    // more than one running tasks).
+    if (activeTaskSets.contains(manager.taskSet.id)) {
+      activeTaskSets -= manager.taskSet.id
+      manager.parent.removeSchedulable(manager)
+      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+      taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
+      taskSetTaskIds.remove(manager.taskSet.id)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 35762b9..e132182 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -138,7 +138,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int,
val sc:
     }
   }
 
-  override def killTasks(stageId: Int): Unit = synchronized {
+  override def cancelTasks(stageId: Int): Unit = synchronized {
     schedulableBuilder.getTaskSetManagers(stageId).foreach { sched =>
       val taskIds = taskSetTaskIds(sched.asInstanceOf[TaskSetManager].taskSet.id)
       for (tid <- taskIds) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index 0fd96ed..758670b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -29,6 +29,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.{SparkContext, SparkException, LocalSparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
+import org.apache.spark.scheduler._
 
 
 class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
@@ -46,24 +48,39 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
 
   lazy val zeroPartRdd = new EmptyRDD[Int](sc)
 
-  test("job cancellation") {
-    val f = sc.parallelize(1 to 1000, 2).map { i => Thread.sleep(1000); i }.countAsync()
+  test("job cancellation before any tasks is launched") {
+    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
+    future { f.cancel() }
+    val e = intercept[SparkException] { f.get() }
+    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+  }
 
+  test("job cancellation after some tasks have been launched") {
+    // Add a listener to release the semaphore once any tasks are launched.
     val sem = new Semaphore(0)
+    sc.dagScheduler.addSparkListener(new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart) {
+        sem.release()
+      }
+    })
+
+    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
     future {
-      //sem.acquire()
-      Thread.sleep(1000)
+      // Wait until some tasks were launched before we cancel the job.
+      sem.acquire()
       f.cancel()
-      println("killing previous job")
-    }
-
-    intercept[SparkException] {
-      println("lalalalalala")
-      println(f.get())
-      println("hahahahah")
     }
+    val e = intercept[SparkException] { f.get() }
+    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+  }
 
+  test("cancelling take action") {
+    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
+    future { f.cancel() }
+    val e = intercept[SparkException] { f.get() }
+    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
   }
+
 //
 //  test("countAsync") {
 //    assert(zeroPartRdd.countAsync().get() === 0)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0353f74a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6643c9d..5e75444 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
     }
-    override def killTasks(stageId: Int) {}
+    override def cancelTasks(stageId: Int) {}
     override def setListener(listener: TaskSchedulerListener) = {}
     override def defaultParallelism() = 2
   }


Mime
View raw message