spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayousterh...@apache.org
Subject spark git commit: [SPARK-13279] Remove O(n^2) operation from scheduler.
Date Wed, 17 Feb 2016 06:28:40 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d95089190 -> 98354cae9


[SPARK-13279] Remove O(n^2) operation from scheduler.

This commit removes an unnecessary duplicate check in addPendingTask that meant
that scheduling a task set took time proportional to (# tasks)^2.

Author: Sital Kedia <skedia@fb.com>

Closes #11175 from sitalkedia/fix_stuck_driver.

(cherry picked from commit 1e1e31e03df14f2e7a9654e640fb2796cf059fe0)
Signed-off-by: Kay Ousterhout <kayousterhout@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98354cae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98354cae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98354cae

Branch: refs/heads/branch-1.6
Commit: 98354cae984e3719a49050e7a6aa75dae78b12bb
Parents: d950891
Author: Sital Kedia <skedia@fb.com>
Authored: Tue Feb 16 22:27:34 2016 -0800
Committer: Kay Ousterhout <kayousterhout@gmail.com>
Committed: Tue Feb 16 22:28:32 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala | 28 +++++++++-----------
 1 file changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98354cae/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a02f301..b5afd9d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
   // treated as stacks, in which new tasks are added to the end of the
   // ArrayBuffer and removed from the end. This makes it faster to detect
   // tasks that repeatedly fail because whenever a task failed, it is put
-  // back at the head of the stack. They are also only cleaned up lazily;
-  // when a task is launched, it remains in all the pending lists except
-  // the one that it was launched from, but gets removed from them later.
+  // back at the head of the stack. These collections may contain duplicates
+  // for two reasons:
+  // (1): Tasks are only removed lazily; when a task is launched, it remains
+  // in all the pending lists except the one that it was launched from.
+  // (2): Tasks may be re-added to these lists multiple times as a result
+  // of failures.
+  // Duplicates are handled in dequeueTaskFromList, which ensures that a
+  // task hasn't already started running before launching it.
   private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
 
   // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
 
   /** Add a task to all the pending-task lists that it should be on. */
   private def addPendingTask(index: Int) {
-    // Utility method that adds `index` to a list only if it's not already there
-    def addTo(list: ArrayBuffer[Int]) {
-      if (!list.contains(index)) {
-        list += index
-      }
-    }
-
     for (loc <- tasks(index).preferredLocations) {
       loc match {
         case e: ExecutorCacheTaskLocation =>
-          addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
+          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
         case e: HDFSCacheTaskLocation => {
           val exe = sched.getExecutorsAliveOnHost(loc.host)
           exe match {
             case Some(set) => {
               for (e <- set) {
-                addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
+                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
               }
               logInfo(s"Pending task $index has a cached location at ${e.host} " +
                 ", where there are executors " + set.mkString(","))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
         }
         case _ => Unit
       }
-      addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
       for (rack <- sched.getRackForHost(loc.host)) {
-        addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
       }
     }
 
     if (tasks(index).preferredLocations == Nil) {
-      addTo(pendingTasksWithNoPrefs)
+      pendingTasksWithNoPrefs += index
     }
 
     allPendingTasks += index  // No point scanning this whole list to find the old task there


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message