spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [25/37] git commit: Switched to use daemon thread in executor and fixed a bug in job cancellation for fair scheduler.
Date Tue, 15 Oct 2013 05:27:11 GMT
Switched to use daemon thread in executor and fixed a bug in job cancellation for fair scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/80cdbf4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/80cdbf4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/80cdbf4f

Branch: refs/heads/master
Commit: 80cdbf4f49cdb07bfd765d3fdd1d16d5aec2e60a
Parents: 058508b
Author: Reynold Xin <rxin@apache.org>
Authored: Thu Oct 10 22:40:48 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Oct 10 22:40:48 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../spark/scheduler/SchedulableBuilder.scala    |  7 +++++--
 .../scheduler/cluster/ClusterScheduler.scala    |  9 --------
 .../scheduler/cluster/SchedulerBackend.scala    |  6 +-----
 .../spark/scheduler/local/LocalScheduler.scala  |  1 -
 .../scheduler/local/LocalSchedulerSuite.scala   | 22 +++++++++++---------
 6 files changed, 19 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4c54427..16258f3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -122,7 +122,7 @@ private[spark] class Executor(
 
   // Start worker thread pool
   val threadPool = new ThreadPoolExecutor(
-    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
+    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
 
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index a4e8653..873801e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -48,9 +48,12 @@ private[spark] trait SchedulableBuilder {
             return Some(tsm)
           }
         case pool: Pool =>
-          getTsm(pool)
+          val found = getTsm(pool)
+          if (found.isDefined) {
+            return getTsm(pool)
+          }
       }
-      return None
+      None
     }
     getTsm(rootPool)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 250dec5..6c12ff7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -78,12 +78,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   private val executorIdToHost = new HashMap[String, String]
 
-  // JAR server, if any JARs were added by the user to the SparkContext
-  var jarServer: HttpServer = null
-
-  // URIs of JARs to pass to executor
-  var jarUris: String = ""
-
   // Listener object to pass upcalls into
   var listener: TaskSchedulerListener = null
 
@@ -356,9 +350,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     if (backend != null) {
       backend.stop()
     }
-    if (jarServer != null) {
-      jarServer.stop()
-    }
     if (taskResultGetter != null) {
       taskResultGetter.stop()
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
index c0578dc..5367218 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
@@ -30,12 +30,8 @@ private[spark] trait SchedulerBackend {
   def reviveOffers(): Unit
   def defaultParallelism(): Int
 
-  def killTask(taskId: Long, executorId: String) {
-    throw new UnsupportedOperationException
-  }
+  def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
 
   // Memory used by each executor (in megabytes)
   protected val executorMemory: Int = SparkContext.executorMemoryRequested
-
-  // TODO: Probably want to add a killTask too
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 0a6f4df..dc6509d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -209,7 +209,6 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int,
val sc:
   }
 
   override def stop() {
-    //threadPool.shutdownNow()
   }
 
   override def defaultParallelism() = threads

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80cdbf4f/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
index af76c84..d46a746 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.scheduler.local
 
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.{ConcurrentMap, HashMap}
 import java.util.concurrent.Semaphore
 import java.util.concurrent.CountDownLatch
-import java.util.Properties
+
+import scala.collection.mutable.HashMap
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import org.apache.spark._
+
 
 class Lock() {
   var finished = false
@@ -63,7 +61,11 @@ object TaskThreadInfo {
  * 5. each task(pending) must use "sleep" to  make sure it has been added to taskSetManager
queue,
  *    thus it will be scheduled later when cluster has free cpu cores.
  */
-class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
+class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach
{
+
+  override def afterEach() {
+    System.clearProperty("spark.scheduler.mode")
+  }
 
   def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore)
{
 


Mime
View raw message