spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [33/37] git commit: Job cancellation: addressed code review feedback round 2 from Kay.
Date Tue, 15 Oct 2013 05:27:19 GMT
Job cancellation: addressed code review feedback round 2 from Kay.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ab0940f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ab0940f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ab0940f0

Branch: refs/heads/master
Commit: ab0940f0c258085bbf930d43be0b9034aad039cf
Parents: 97ffebb
Author: Reynold Xin <rxin@apache.org>
Authored: Fri Oct 11 18:15:04 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Fri Oct 11 18:15:04 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   |  4 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |  7 +-
 .../cluster/ClusterTaskSetManager.scala         | 80 ++++++++++----------
 3 files changed, 47 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab0940f0/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 9185b95..85018cb 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -56,7 +56,7 @@ trait FutureAction[T] extends Future[T] {
   override def result(atMost: Duration)(implicit permit: CanAwait): T
 
   /**
-   * When this action is completed, either through an exception, or a value, apply the provided
+   * When this action is completed, either through an exception, or a value, applies the
provided
    * function.
    */
   def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
@@ -91,7 +91,7 @@ class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: =>
T)
   extends FutureAction[T] {
 
   override def cancel() {
-    jobWaiter.kill()
+    jobWaiter.cancel()
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type =
{

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab0940f0/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 7274547..58f238d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -39,7 +39,12 @@ private[spark] class JobWaiter[T](
   // partition RDDs), we set the jobResult directly to JobSucceeded.
   private var jobResult: JobResult = if (jobFinished) JobSucceeded else null
 
-  def kill() {
+  /**
+   * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled
+   * asynchronously. After the low level scheduler cancels all the tasks belonging to this
job, it
+   * will fail this job with a SparkException.
+   */
+  def cancel() {
     dagScheduler.cancelJob(jobId)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ab0940f0/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 1198bac..7e0667b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -461,54 +461,52 @@ private[spark] class ClusterTaskSetManager(
       // Check if the problem is a map output fetch failure. In that case, this
       // task will never succeed on any node, so tell the scheduler about it.
       reason.foreach {
-        _ match {
-          case fetchFailed: FetchFailed =>
-            logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
-            sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
-            successful(index) = true
-            tasksSuccessful += 1
-            sched.taskSetFinished(this)
-            removeAllRunningTasks()
-            return
-
-          case TaskKilled =>
-            logInfo("Task %d was killed.".format(tid))
-            abort("Task %d was killed".format(tid))
-            return
-
-          case ef: ExceptionFailure =>
-            sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
-            val key = ef.description
-            val now = clock.getTime()
-            val (printFull, dupCount) = {
-              if (recentExceptions.contains(key)) {
-                val (dupCount, printTime) = recentExceptions(key)
-                if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
-                  recentExceptions(key) = (0, now)
-                  (true, 0)
-                } else {
-                  recentExceptions(key) = (dupCount + 1, printTime)
-                  (false, dupCount + 1)
-                }
-              } else {
+        case fetchFailed: FetchFailed =>
+          logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+          sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
+          successful(index) = true
+          tasksSuccessful += 1
+          sched.taskSetFinished(this)
+          removeAllRunningTasks()
+          return
+
+        case TaskKilled =>
+          logInfo("Task %d was killed.".format(tid))
+          sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
+          return
+
+        case ef: ExceptionFailure =>
+          sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+          val key = ef.description
+          val now = clock.getTime()
+          val (printFull, dupCount) = {
+            if (recentExceptions.contains(key)) {
+              val (dupCount, printTime) = recentExceptions(key)
+              if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
                 recentExceptions(key) = (0, now)
                 (true, 0)
+              } else {
+                recentExceptions(key) = (dupCount + 1, printTime)
+                (false, dupCount + 1)
               }
-            }
-            if (printFull) {
-              val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
-              logInfo("Loss was due to %s\n%s\n%s".format(
-                ef.className, ef.description, locs.mkString("\n")))
             } else {
-              logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+              recentExceptions(key) = (0, now)
+              (true, 0)
             }
+          }
+          if (printFull) {
+            val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+            logInfo("Loss was due to %s\n%s\n%s".format(
+              ef.className, ef.description, locs.mkString("\n")))
+          } else {
+            logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+          }
 
-          case TaskResultLost =>
-            logInfo("Lost result for TID %s on host %s".format(tid, info.host))
-            sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+        case TaskResultLost =>
+          logInfo("Lost result for TID %s on host %s".format(tid, info.host))
+          sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
 
-          case _ => {}
-        }
+        case _ => {}
       }
       // On non-fetch failures, re-enqueue the task as pending for a max number of retries
       addPendingTask(index)


Mime
View raw message