spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [01/11] git commit: Merge pull request #928 from jerryshao/fairscheduler-refactor
Date Thu, 26 Sep 2013 22:01:51 GMT
Updated Branches:
  refs/heads/branch-0.8 df0a40ccf -> 3cb9040f9


Merge pull request #928 from jerryshao/fairscheduler-refactor

Refactor FairSchedulableBuilder
(cherry picked from commit 834686b108ce31cbee531d89de6c6e80913448f4)

Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d5a8dbfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d5a8dbfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d5a8dbfb

Branch: refs/heads/branch-0.8
Commit: d5a8dbfb2495c42f609960c28915c30fbe5541a6
Parents: df0a40c
Author: Reynold Xin <reynoldx@gmail.com>
Authored: Sun Sep 22 15:06:48 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Sep 26 13:09:30 2013 -0700

----------------------------------------------------------------------
 .../scheduler/cluster/SchedulableBuilder.scala  | 99 +++++++++++---------
 1 file changed, 56 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d5a8dbfb/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index f808233..114617c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
-import java.util.Properties
-
-import scala.xml.XML
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
 
 import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
+import scala.xml.XML
 
 /**
  * An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+  val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+  val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
   val DEFAULT_POOL_NAME = "default"
   val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-    if (schedulerAllocFile != null) {
-    val file = new File(schedulerAllocFile)
-      if (file.exists()) {
-        val xml = XML.loadFile(file)
-        for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
-          val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-          var schedulingMode = DEFAULT_SCHEDULING_MODE
-          var minShare = DEFAULT_MINIMUM_SHARE
-          var weight = DEFAULT_WEIGHT
-
-          val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-          if (xmlSchedulingMode != "") {
-            try {
-              schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-            } catch {
-              case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
-            }
-          }
-
-          val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-          if (xmlMinShare != "") {
-            minShare = xmlMinShare.toInt
-          }
-
-          val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
-          if (xmlWeight != "") {
-            weight = xmlWeight.toInt
-          }
-
-          val pool = new Pool(poolName, schedulingMode, minShare, weight)
-          rootPool.addSchedulable(pool)
-          logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
-            poolName, schedulingMode, minShare, weight))
+    var is: Option[InputStream] = None
+    try {
+      is = Option {
+        schedulerAllocFile.map { f =>
+          new FileInputStream(f)
+        }.getOrElse {
+          getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
         }
-      } else {
-        throw new java.io.FileNotFoundException(
-          "Fair scheduler allocation file not found: " + schedulerAllocFile)
       }
+
+      is.foreach { i => buildFairSchedulerPool(i) }
+    } finally {
+      is.foreach(_.close())
     }
 
     // finally create "default" pool
+    buildDefaultPool()
+  }
+
+  private def buildDefaultPool() {
     if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
       val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
         DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
     }
   }
 
+  private def buildFairSchedulerPool(is: InputStream) {
+    val xml = XML.load(is)
+    for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+      var schedulingMode = DEFAULT_SCHEDULING_MODE
+      var minShare = DEFAULT_MINIMUM_SHARE
+      var weight = DEFAULT_WEIGHT
+
+      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+      if (xmlSchedulingMode != "") {
+        try {
+          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+        } catch {
+          case e: NoSuchElementException =>
+            logWarning("Error xml schedulingMode, using default schedulingMode")
+        }
+      }
+
+      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+      if (xmlMinShare != "") {
+        minShare = xmlMinShare.toInt
+      }
+
+      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+      if (xmlWeight != "") {
+        weight = xmlWeight.toInt
+      }
+
+      val pool = new Pool(poolName, schedulingMode, minShare, weight)
+      rootPool.addSchedulable(pool)
+      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+        poolName, schedulingMode, minShare, weight))
+    }
+  }
+
   override def addTaskSetManager(manager: Schedulable, properties: Properties) {
     var poolName = DEFAULT_POOL_NAME
     var parentPool = rootPool.getSchedulableByName(poolName)


Mime
View raw message