spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [01/14] git commit: Misc style changes in the 'yarn' package.
Date Thu, 05 Dec 2013 07:33:12 GMT
Updated Branches:
  refs/heads/master 182f9baee -> 72b696156


Misc style changes in the 'yarn' package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a98f5a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a98f5a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a98f5a0e

Branch: refs/heads/master
Commit: a98f5a0ebb3e94f55439b81bee77b1def079d67c
Parents: e2ebc3a
Author: Harvey Feng <harvey@databricks.com>
Authored: Fri Nov 15 03:32:37 2013 -0800
Committer: Harvey Feng <harvey@databricks.com>
Committed: Sun Nov 17 22:35:56 2013 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 166 +++++++-----
 .../org/apache/spark/deploy/yarn/Client.scala   | 134 +++++-----
 .../spark/deploy/yarn/WorkerRunnable.scala      |  85 +++---
 .../deploy/yarn/YarnAllocationHandler.scala     | 264 ++++++++++++-------
 4 files changed, 376 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a98f5a0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0e47bd7..e4f3d3e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -21,9 +21,13 @@ import java.io.IOException
 import java.net.Socket
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConversions._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
@@ -31,35 +35,36 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
 import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.util.Utils
-import scala.collection.JavaConversions._
+
 
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
 
   def this(args: ApplicationMasterArguments) = this(args, new Configuration())
   
   private var rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = null
-  private var appAttemptId: ApplicationAttemptId = null
-  private var userThread: Thread = null
+  private var resourceManager: AMRMProtocol = _
+  private var appAttemptId: ApplicationAttemptId = _
+  private var userThread: Thread = _
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   private val fs = FileSystem.get(yarnConf)
 
-  private var yarnAllocator: YarnAllocationHandler = null
-  private var isFinished:Boolean = false
-  private var uiAddress: String = ""
+  private var yarnAllocator: YarnAllocationHandler = _
+  private var isFinished: Boolean = false
+  private var uiAddress: String = _
   private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
 
 
   def run() {
-    // setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp
+    // Setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp.
     System.setProperty("spark.local.dir", getLocalDirs())
 
-    // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
+    // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
     
     appAttemptId = getApplicationAttemptId()
@@ -68,9 +73,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     // Workaround until hadoop moves to something which has
     // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
-    // ignore result
+    // ignore result.
     // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
-    // Hence args.workerCores = numCore disabled above. Any better option ?
+    // Hence args.workerCores = numCore disabled above. Any better option?
 
     // Compute number of threads for akka
     //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
@@ -96,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     waitForSparkContextInitialized()
 
-    // do this after spark master is up and SparkContext is created so that we can register UI Url
+    // Do this after spark master is up and SparkContext is created so that we can register UI Url
     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
     
     // Allocate all containers
@@ -115,12 +120,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-      .getOrElse(""))
+        .getOrElse(""))
 
     if (localDirs.isEmpty()) {
       throw new Exception("Yarn Local dirs can't be empty")
     }
-    return localDirs
+    localDirs
   }
   
   private def getApplicationAttemptId(): ApplicationAttemptId = {
@@ -129,7 +134,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     val containerId = ConverterUtils.toContainerId(containerIdString)
     val appAttemptId = containerId.getApplicationAttemptId()
     logInfo("ApplicationAttemptId: " + appAttemptId)
-    return appAttemptId
+    appAttemptId
   }
   
   private def registerWithResourceManager(): AMRMProtocol = {
@@ -137,7 +142,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       YarnConfiguration.RM_SCHEDULER_ADDRESS,
       YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
     logInfo("Connecting to ResourceManager at " + rmAddress)
-    return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
   }
   
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -145,12 +150,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
       .asInstanceOf[RegisterApplicationMasterRequest]
     appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some sensible info. 
+    // Setting this to master host,port - so that the ApplicationReport at client has some
+    // sensible info. 
     // Users can then monitor stderr/stdout on that node if required.
     appMasterRequest.setHost(Utils.localHostName())
     appMasterRequest.setRpcPort(0)
     appMasterRequest.setTrackingUrl(uiAddress)
-    return resourceManager.registerApplicationMaster(appMasterRequest)
+    resourceManager.registerApplicationMaster(appMasterRequest)
   }
   
   private def waitForSparkMaster() {
@@ -164,21 +170,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       try {
         val socket = new Socket(driverHost, driverPort.toInt)
         socket.close()
-        logInfo("Driver now available: " + driverHost + ":" + driverPort)
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
         driverUp = true
       } catch {
-        case e: Exception =>
-          logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
-        Thread.sleep(100)
-        tries = tries + 1
+        case e: Exception => {
+          logWarning("Failed to connect to driver at %s:%s, retrying ...").
+            format(driverHost, driverPort)
+          Thread.sleep(100)
+          tries = tries + 1
+        }
       }
     }
   }
 
   private def startUserClass(): Thread  = {
     logInfo("Starting the user JAR in a separate Thread")
-    val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
-      .getMethod("main", classOf[Array[String]])
+    val mainMethod = Class.forName(
+      args.userClass,
+      false /* initialize */,
+      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
         var successed = false
@@ -203,7 +213,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       }
     }
     t.start()
-    return t
+    t
   }
 
   // this need to happen before allocateWorkers
@@ -225,12 +235,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
         if (null != sparkContext) {
           uiAddress = sparkContext.ui.appUIAddress
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, 
-                                               sparkContext.preferredNodeLocationData) 
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            resourceManager,
+            appAttemptId,
+            args, 
+            sparkContext.preferredNodeLocationData) 
         } else {
-          logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + 
-                  ", numTries = " + numTries)
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+          logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+            format(count * waitTime, numTries))
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(
+            yarnConf,
+            resourceManager,
+            appAttemptId,
+            args)
         }
       }
     } finally {
@@ -246,35 +264,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
-      while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
-        // If user thread exists, then quit !
-        userThread.isAlive) {
 
-          this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
-          ApplicationMaster.incrementAllocatorLoop(1)
-          Thread.sleep(100)
+      // Exists the loop if the user thread exits.
+      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+        val numContainersToAllocate = math.max(
+          args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
+        this.yarnAllocator.allocateContainers(numContainersToAllocate)
+        ApplicationMaster.incrementAllocatorLoop(1)
+        Thread.sleep(100)
       }
     } finally {
-      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : 
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
     logInfo("All workers have launched.")
 
-    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+    // Launch a progress reporter thread, else the app will get killed after expiration
+    // (def: 10mins) timeout.
+    // TODO(harvey): Verify the timeout
     if (userThread.isAlive) {
-      // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
+      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
       val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-      // must be <= timeoutInterval/ 2.
-      // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
-      // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+      // Must be <= timeoutInterval/ 2.
+      // On other hand, also ensure that we are reasonably responsive without causing too many
+      // requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
       val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
       launchReporterThread(interval)
     }
   }
 
-  // TODO: We might want to extend this to allocate more containers in case they die !
+  // TODO: We might want to extend this to allocate more containers in case they die.
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
 
@@ -283,7 +303,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
         while (userThread.isAlive) {
           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
           if (missingWorkerCount > 0) {
-            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+            logInfo("Allocating %d containers to make up for (potentially) lost containers".
+              format(missingWorkerCount))
             yarnAllocator.allocateContainers(missingWorkerCount)
           }
           else sendProgress()
@@ -291,16 +312,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
         }
       }
     }
-    // setting to daemon status, though this is usually not a good idea.
+    // Setting to daemon status, though this is usually not a good idea.
     t.setDaemon(true)
     t.start()
     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    return t
+    t
   }
 
   private def sendProgress() {
     logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
+    // Simulated with an allocate request with no nodes requested ...
     yarnAllocator.allocateContainers(0)
   }
 
@@ -320,7 +341,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   */
 
   def finishApplicationMaster(status: FinalApplicationStatus) {
-
     synchronized {
       if (isFinished) {
         return
@@ -333,14 +353,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
     finishReq.setFinishApplicationStatus(status)
-    // set tracking url to empty since we don't have a history server
+    // Set tracking url to empty since we don't have a history server.
     finishReq.setTrackingUrl("")
     resourceManager.finishApplicationMaster(finishReq)
-
   }
 
   /**
-   * clean up the staging directory. 
+   * Clean up the staging directory. 
    */
   private def cleanupStagingDir() { 
     var stagingDirPath: Path = null
@@ -356,13 +375,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
         fs.delete(stagingDirPath, true)
       }
     } catch {
-      case e: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, e)
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
     }
   }
 
-  // The shutdown hook that runs when a signal is received AND during normal
-  // close of the JVM. 
+  // The shutdown hook that runs when a signal is received AND during normal close of the JVM. 
   class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
 
     def run() {
@@ -372,15 +390,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
   }
- 
 }
 
 object ApplicationMaster {
-  // number of times to wait for the allocator loop to complete.
-  // each loop iteration waits for 100ms, so maximum of 3 seconds.
+  // Number of times to wait for the allocator loop to complete.
+  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
   // This is to ensure that we have reasonable number of containers before we start
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more 
-  // containers are available. Might need to handle this better.
+  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+  // optimal as more containers are available. Might need to handle this better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
@@ -398,7 +415,8 @@ object ApplicationMaster {
     applicationMasters.add(master)
   }
 
-  val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
+  val sparkContextRef: AtomicReference[SparkContext] =
+    new AtomicReference[SparkContext](null /* initialValue */)
   val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
 
   def sparkContextInitialized(sc: SparkContext): Boolean = {
@@ -408,19 +426,21 @@ object ApplicationMaster {
       sparkContextRef.notifyAll()
     }
 
-    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
-    // Should not really have to do this, but it helps yarn to evict resources earlier.
-    // not to mention, prevent Client declaring failure even though we exit'ed properly.
-    // Note that this will unfortunately not properly clean up the staging files because it gets called to 
-    // late and the filesystem is already shutdown.
+    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+    // System.exit.
+    // Should not really have to do this, but it helps YARN to evict resources earlier.
+    // Not to mention, prevent the Client from declaring failure even though we exited properly.
+    // Note that this will unfortunately not properly clean up the staging files because it gets
+    // called too late, after the filesystem is already shutdown.
     if (modified) {
       Runtime.getRuntime().addShutdownHook(new Thread with Logging { 
-        // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
+        // This is not only logs, but also ensures that log system is initialized for this instance
+        // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
         override def run() { 
           logInfo("Invoking sc stop from shutdown hook") 
           sc.stop() 
-          // best case ...
+          // Best case ...
           for (master <- applicationMasters) {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
@@ -428,7 +448,7 @@ object ApplicationMaster {
       } )
     }
 
-    // Wait for initialization to complete and atleast 'some' nodes can get allocated
+    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
     yarnAllocatorLoop.synchronized {
       while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
         yarnAllocatorLoop.wait(1000L)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a98f5a0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c38bdd1..08699cc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -20,43 +20,43 @@ package org.apache.spark.deploy.yarn
 import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
 
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-import scala.collection.JavaConversions._
-
 import org.apache.spark.Logging 
 import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
+
 class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-  
+
   def this(args: ClientArguments) = this(new Configuration(), args)
-  
+
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  // staging directory is private! -> rwx--------
+  // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
-  // app files are world-wide readable and owner writable -> rw-r--r--
+  // App files are world-wide readable and owner writable -> rw-r--r--
   val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) 
 
   def run() {
@@ -79,7 +79,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
 
     submitApp(appContext)
-    
+
     monitorApplication(appId)
     System.exit(0)
   }
@@ -90,20 +90,25 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
   def logClusterResourceDetails() {
     val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
-    logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
+    logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+      clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
-      ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
-      ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
+    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+      queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+        queueInfo.getQueueName,
+        queueInfo.getCurrentCapacity,
+        queueInfo.getMaximumCapacity,
+        queueInfo.getApplications.size,
+        queueInfo.getChildQueues.size)
   }
 
-  
+
   def verifyClusterResources(app: GetNewApplicationResponse) = { 
     val maxMem = app.getMaximumResourceCapability().getMemory()
     logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-    
-    // if we have requested more then the clusters max for a single resource then exit.
+
+    // If we have requested more then the clusters max for a single resource then exit.
     if (args.workerMemory > maxMem) {
       logError("the worker size is to large to run on this cluster " + args.workerMemory);
       System.exit(1)
@@ -114,10 +119,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       System.exit(1)
     }
 
-    // We could add checks to make sure the entire cluster has enough resources but that involves getting
-    // all the node reports and computing ourselves 
+    // We could add checks to make sure the entire cluster has enough resources but that involves
+    // getting all the node reports and computing ourselves 
   }
-  
+
   def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
     logInfo("Setting up application submission context for ASM")
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
@@ -126,9 +131,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return appContext
   }
 
-  /*
-   * see if two file systems are the same or not.
-   */
+  /** See if two file systems are the same or not. */
   private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
     val srcUri = srcFs.getUri()
     val dstUri = destFs.getUri()
@@ -163,9 +166,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return true;
   }
 
-  /**
-   * Copy the file into HDFS if needed.
-   */
+  /** Copy the file into HDFS if needed. */
   private def copyRemoteFile(
       dstDir: Path,
       originalPath: Path,
@@ -181,9 +182,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       fs.setReplication(newPath, replication);
       if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
     } 
-    // resolve any symlinks in the URI path so using a "current" symlink
-    // to point to a specific version shows the specific version
-    // in the distributed cache configuration
+    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+    // version shows the specific version in the distributed cache configuration
     val qualPath = fs.makeQualified(newPath)
     val fc = FileContext.getFileContext(qualPath.toUri(), conf)
     val destPath = fc.resolvePath(qualPath)
@@ -192,8 +192,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
   def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
     logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system if necessary
-    // Add them as local resources to the AM
+    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+    // local resources to the AM.
     val fs = FileSystem.get(conf)
 
     val delegTokenRenewer = Master.getMasterPrincipal(conf);
@@ -276,7 +276,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
     return localResources
   }
-  
+
   def setupLaunchEnv(
       localResources: HashMap[String, LocalResource], 
       stagingDir: String): HashMap[String, String] = {
@@ -289,16 +289,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
 
-    // set the environment variables to be passed on to the Workers
+    // Set the environment variables to be passed on to the Workers.
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
-    // allow users to specify some environment variables
+    // Allow users to specify some environment variables.
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
 
-    // Add each SPARK-* key to the environment
+    // Add each SPARK-* key to the environment.
     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    return env
+    env
   }
 
   def userArgsToString(clientArgs: ClientArguments): String = {
@@ -308,13 +308,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     for (arg <- args){
       retval.append(prefix).append(" '").append(arg).append("' ")
     }
-
     retval.toString
   }
 
-  def createContainerLaunchContext(newApp: GetNewApplicationResponse,
-                                   localResources: HashMap[String, LocalResource],
-                                   env: HashMap[String, String]): ContainerLaunchContext = {
+  def createContainerLaunchContext(
+      newApp: GetNewApplicationResponse,
+      localResources: HashMap[String, LocalResource],
+      env: HashMap[String, String]): ContainerLaunchContext = {
     logInfo("Setting up container launch context")
     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
     amContainer.setLocalResources(localResources)
@@ -322,8 +322,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
 
+    // TODO(harvey): This can probably be a val.
     var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
-        (if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD
+      ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+        YarnAllocationHandler.MEMORY_OVERHEAD)
 
     // Extra options for the JVM
     var JAVA_OPTS = ""
@@ -334,14 +336,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     JAVA_OPTS += " -Djava.io.tmpdir=" + 
       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
 
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
-    // node, spark gc effects all other containers performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
-    // limited to subset of cores on a node.
-    if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) {
-      // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out. The context is, default gc for server class machines
+    // end up using all cores to do gc - hence if there are multiple containers in same node,
+    // spark gc effects all other containers performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+    val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+    if (useConcurrentAndIncrementalGC) {
+      // In our expts, using (default) throughput collector has severe perf ramnifications in
+      // multi-tenant machines
       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
       JAVA_OPTS += " -XX:+CMSIncrementalMode "
       JAVA_OPTS += " -XX:+CMSIncrementalPacing "
@@ -353,7 +359,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
     }
 
-    // Command for the ApplicationMaster
+    // Command for the ApplicationMaster.
     var javaCommand = "java";
     val javaHome = System.getenv("JAVA_HOME")
     if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
@@ -379,28 +385,28 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
     logInfo("Command for the ApplicationMaster: " + commands(0))
     amContainer.setCommands(commands)
-    
+
     val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    // Memory for the ApplicationMaster
+    // Memory for the ApplicationMaster.
     capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
     amContainer.setResource(capability)
 
-    // Setup security tokens
+    // Setup security tokens.
     val dob = new DataOutputBuffer()
     credentials.writeTokenStorageToStream(dob)
     amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
 
-    return amContainer
+    amContainer
   }
-  
+
   def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager
+    // Submit the application to the applications manager.
     logInfo("Submitting application to ASM")
     super.submitApplication(appContext)
   }
-  
+
   def monitorApplication(appId: ApplicationId): Boolean = {  
-    while(true) {
+    while (true) {
       Thread.sleep(1000)
       val report = super.getApplicationReport(appId)
 
@@ -418,16 +424,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
         "\t appUser: " + report.getUser()
       )
-      
+
       val state = report.getYarnApplicationState()
       val dsStatus = report.getFinalApplicationStatus()
       if (state == YarnApplicationState.FINISHED || 
         state == YarnApplicationState.FAILED ||
         state == YarnApplicationState.KILLED) {
-          return true
+        return true
       }
     }
-    return true
+    true
   }
 }
 
@@ -459,7 +465,7 @@ object Client {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
         Path.SEPARATOR + LOG4J_PROP)
     }
-    // normally the users app.jar is last in case conflicts with spark jars
+    // Normally the users app.jar is last in case conflicts with spark jars
     val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a98f5a0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 7a66532..d9eabf3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -21,53 +21,60 @@ import java.net.URI
 import java.nio.ByteBuffer
 import java.security.PrivilegedExceptionAction
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
 
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 
-class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
-    slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) 
-    extends Runnable with Logging {
-  
+
+class WorkerRunnable(
+    container: Container,
+    conf: Configuration,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    workerMemory: Int,
+    workerCores: Int) 
+  extends Runnable with Logging {
+
   var rpc: YarnRPC = YarnRPC.create(conf)
   var cm: ContainerManager = null
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  
+
   def run = {
     logInfo("Starting Worker Container")
     cm = connectToCM
     startContainer
   }
-  
+
   def startContainer = {
     logInfo("Setting up ContainerLaunchContext")
-    
+
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
       .asInstanceOf[ContainerLaunchContext]
-    
+
     ctx.setContainerId(container.getId())
     ctx.setResource(container.getResource())
     val localResources = prepareLocalResources
     ctx.setLocalResources(localResources)
-    
+
     val env = prepareEnvironment
     ctx.setEnvironment(env)
-    
+
     // Extra options for the JVM
     var JAVA_OPTS = ""
     // Set the JVM memory
@@ -80,17 +87,21 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     JAVA_OPTS += " -Djava.io.tmpdir=" + 
       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
 
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
-    // node, spark gc effects all other containers performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
-    // limited to subset of cores on a node.
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
 /*
     else {
       // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it.
-      // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines
+      // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+      // want to mess with it.
+      // In our expts, using (default) throughput collector has severe perf ramnifications in
+      // multi-tennent machines
       // The options are based on
       // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
@@ -117,8 +128,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     val commands = List[String](javaCommand +
       " -server " +
       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
-      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
+      // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+      // an inconsistent state.
+      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
       " -XX:OnOutOfMemoryError='kill %p' " +
       JAVA_OPTS +
       " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
@@ -130,7 +143,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
     logInfo("Setting up worker with commands: " + commands)
     ctx.setCommands(commands)
-    
+
     // Send the start request to the ContainerManager
     val startReq = Records.newRecord(classOf[StartContainerRequest])
     .asInstanceOf[StartContainerRequest]
@@ -138,7 +151,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     cm.startContainer(startReq)
   }
 
-  private def setupDistributedCache(file: String,
+  private def setupDistributedCache(
+      file: String,
       rtype: LocalResourceType,
       localResources: HashMap[String, LocalResource],
       timestamp: String,
@@ -153,12 +167,11 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     amJarRsrc.setSize(size.toLong)
     localResources(uri.getFragment()) = amJarRsrc
   }
-  
-  
+
   def prepareLocalResources: HashMap[String, LocalResource] = {
     logInfo("Preparing Local resources")
     val localResources = HashMap[String, LocalResource]()
-    
+
     if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
       val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
       val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
@@ -180,30 +193,30 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
           timeStamps(i), fileSizes(i), visibilities(i))
       }
     }
-    
+
     logInfo("Prepared Local resources " + localResources)
     return localResources
   }
-  
+
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
     Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
-    // allow users to specify some environment variables
+    // Allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
 
     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
     return env
   }
-  
+
   def connectToCM: ContainerManager = {
     val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
     val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
     logInfo("Connecting to ContainerManager at " + cmHostPortStr)
 
-    // use doAs and remoteUser here so we can add the container token and not 
-    // pollute the current users credentials with all of the individual container tokens
+    // Use doAs and remoteUser here so we can add the container token and not pollute the current
+    // users credentials with all of the individual container tokens
     val user = UserGroupInformation.createRemoteUser(container.getId().toString());
     val containerToken = container.getContainerToken();
     if (containerToken != null) {
@@ -219,5 +232,5 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
         });
     return proxy;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a98f5a0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 25da9aa..a9fbc27 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,55 +17,70 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
+import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
 import org.apache.spark.Logging
-import org.apache.spark.util.Utils
 import org.apache.spark.scheduler.SplitInfo
-import scala.collection
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
 import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.AMRMProtocol
+import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
 import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
 import org.apache.hadoop.yarn.util.{RackResolver, Records}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import collection.JavaConversions._
-import collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.hadoop.conf.Configuration
-import java.util.{Collections, Set => JSet}
-import java.lang.{Boolean => JBoolean}
+
 
 object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
   type AllocationType = Value
   val HOST, RACK, ANY = Value
 }
 
-// too many params ? refactor it 'somehow' ?
-// needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive : should make it 
-// more proactive and decoupled.
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
 // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
 // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
 // on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceManager: AMRMProtocol, 
-                                          val appAttemptId: ApplicationAttemptId,
-                                          val maxWorkers: Int, val workerMemory: Int, val workerCores: Int,
-                                          val preferredHostToCount: Map[String, Int], 
-                                          val preferredRackToCount: Map[String, Int])
+private[yarn] class YarnAllocationHandler(
+    val conf: Configuration,
+    val resourceManager: AMRMProtocol, 
+    val appAttemptId: ApplicationAttemptId,
+    val maxWorkers: Int,
+    val workerMemory: Int,
+    val workerCores: Int,
+    val preferredHostToCount: Map[String, Int], 
+    val preferredRackToCount: Map[String, Int])
   extends Logging {
-
-
   // These three are locked on allocatedHostToContainersMap. Complementary data structures
   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
-  // allocatedContainerToHostMap: container to host mapping
-  private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]()
+  // allocatedContainerToHostMap: container to host mapping.
+  private val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]()
+
   private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on allocatedHostToContainersMap
+
+  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+  // allocated node)
+  // As with the two data structures above, tightly coupled with them, and to be locked on
+  // allocatedHostToContainersMap
   private val allocatedRackCount = new HashMap[String, Int]()
 
-  // containers which have been released.
+  // Containers which have been released.
   private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
-  // containers to be released in next request to RM
+  // Containers to be released in next request to RM
   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
 
   private val numWorkersRunning = new AtomicInteger()
@@ -75,13 +90,13 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
 
   def getNumWorkersRunning: Int = numWorkersRunning.intValue
 
-
   def isResourceConstraintSatisfied(container: Container): Boolean = {
     container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
   }
 
   def allocateContainers(workersToRequest: Int) {
-    // We need to send the request only once from what I understand ... but for now, not modifying this much.
+    // We need to send the request only once from what I understand ... but for now, not modifying
+    // this much.
 
     // Keep polling the Resource Manager for containers
     val amResp = allocateWorkerResources(workersToRequest).getAMResponse
@@ -97,7 +112,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
 
       val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
 
-      // ignore if not satisfying constraints      {
+      // Ignore if not satisfying constraints      {
       for (container <- _allocatedContainers) {
         if (isResourceConstraintSatisfied(container)) {
           // allocatedContainers += container
@@ -111,8 +126,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
         else releasedContainerList.add(container.getId())
       }
 
-      // Find the appropriate containers to use
-      // Slightly non trivial groupBy I guess ...
+      // Find the appropriate containers to use. Slightly non trivial groupBy ...
       val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
       val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
       val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
@@ -132,7 +146,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
           remainingContainers = null
         }
         else if (requiredHostCount > 0) {
-          // container list has more containers than we need for data locality.
+          // Container list has more containers than we need for data locality.
           // Split into two : data local container count of (remainingContainers.size - requiredHostCount) 
           // and rest as remainingContainer
           val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
@@ -140,13 +154,13 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
           // remainingContainers = remaining
 
           // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
-          // add remaining to release list. If we have insufficient containers, next allocation cycle 
-          // will reallocate (but wont treat it as data local)
+          // add remaining to release list. If we have insufficient containers, next allocation 
+          // cycle will reallocate (but wont treat it as data local)
           for (container <- remaining) releasedContainerList.add(container.getId())
           remainingContainers = null
         }
 
-        // now rack local
+        // Now rack local
         if (remainingContainers != null){
           val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
 
@@ -159,15 +173,17 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
             if (requiredRackCount >= remainingContainers.size){
               // Add all to dataLocalContainers
               dataLocalContainers.put(rack, remainingContainers)
-              // all consumed
+              // All consumed
               remainingContainers = null
             }
             else if (requiredRackCount > 0) {
               // container list has more containers than we need for data locality.
               // Split into two : data local container count of (remainingContainers.size - requiredRackCount) 
               // and rest as remainingContainer
-              val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())
+              val (rackLocal, remaining) = remainingContainers.splitAt(
+                remainingContainers.size - requiredRackCount)
+              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                new ArrayBuffer[Container]())
 
               existingRackLocal ++= rackLocal
               remainingContainers = remaining
@@ -183,8 +199,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
 
       // Now that we have split the containers into various groups, go through them in order : 
       // first host local, then rack local and then off rack (everything else).
-      // Note that the list we create below tries to ensure that not all containers end up within a host 
-      // if there are sufficiently large number of hosts/containers.
+      // Note that the list we create below tries to ensure that not all containers end up within a
+      // host if there are sufficiently large number of hosts/containers.
 
       val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
       allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
@@ -197,7 +213,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
         val workerHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        assert (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+        assert(container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
 
         if (numWorkersRunningNow > maxWorkers) {
           logInfo("Ignoring container " + containerId + " at host " + workerHostname + 
@@ -207,19 +223,22 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
           numWorkersRunning.decrementAndGet()
         }
         else {
-          // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
+          // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+          // (workerIdCounter)
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka://spark@%s:%s/user/%s".format(
             System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("launching container on " + containerId + " host " + workerHostname)
-          // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
+          // Just to be safe, simply remove it from pendingReleaseContainers.
+          // Should not be there, but ..
           pendingReleaseContainers.remove(containerId)
 
           val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
           allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, new HashSet[ContainerId]())
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+              new HashSet[ContainerId]())
 
             containerSet += containerId
             allocatedContainerToHostMap.put(containerId, workerHostname)
@@ -251,7 +270,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
           pendingReleaseContainers.remove(containerId)
         }
         else {
-          // simply decrement count - next iteration of ReporterThread will take care of allocating !
+          // Simply decrement count - next iteration of ReporterThread will take care of allocating.
           numWorkersRunning.decrementAndGet()
           logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
             " httpaddress: " + completedContainer.getDiagnostics)
@@ -271,7 +290,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
 
             allocatedContainerToHostMap -= containerId
 
-            // doing this within locked context, sigh ... move to outside ?
+            // Doing this within locked context, sigh ... move to outside ?
             val rack = YarnAllocationHandler.lookupRack(conf, host)
             if (rack != null) {
               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
@@ -350,17 +369,24 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
         val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
 
         if (requiredCount > 0) {
-          hostContainerRequests += 
-            createResourceRequest(AllocationType.HOST, candidateHost, requiredCount, YarnAllocationHandler.PRIORITY)
+          hostContainerRequests += createResourceRequest(
+            AllocationType.HOST,
+            candidateHost,
+            requiredCount,
+            YarnAllocationHandler.PRIORITY)
         }
       }
-      val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(hostContainerRequests.toList)
+      val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
+        hostContainerRequests.toList)
 
-      val anyContainerRequests: ResourceRequest = 
-        createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY)
+      val anyContainerRequests: ResourceRequest = createResourceRequest(
+        AllocationType.ANY,
+        resource = null,
+        numWorkers,
+        YarnAllocationHandler.PRIORITY)
 
-      val containerRequests: ArrayBuffer[ResourceRequest] =
-        new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1)
+      val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
+        hostContainerRequests.size() + rackContainerRequests.size() + 1)
 
       containerRequests ++= hostContainerRequests
       containerRequests ++= rackContainerRequests
@@ -378,55 +404,60 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
     val releasedContainerList = createReleasedContainerList()
     req.addAllReleases(releasedContainerList)
 
-
-
     if (numWorkers > 0) {
-      logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
+      logInfo("Allocating %d worker containers with %d of memory each.").format(numWorkers,
+        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
     }
     else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
     }
 
-    for (req <- resourceRequests) {
-      logInfo("rsrcRequest ... host : " + req.getHostName + ", numContainers : " + req.getNumContainers +
-        ", p = " + req.getPriority().getPriority + ", capability: "  + req.getCapability)
+    for (request <- resourceRequests) {
+      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %d , capability : %s)").
+        format(
+          request.getHostName,
+          request.getNumContainers,
+          request.getPriority,
+          request.getCapability)
     }
     resourceManager.allocate(req)
   }
 
 
-  private def createResourceRequest(requestType: AllocationType.AllocationType, 
-                                    resource:String, numWorkers: Int, priority: Int): ResourceRequest = {
+  private def createResourceRequest(
+    requestType: AllocationType.AllocationType, 
+    resource:String,
+    numWorkers: Int,
+    priority: Int): ResourceRequest = {
 
     // If hostname specified, we need atleast two requests - node local and rack local.
     // There must be a third request - which is ANY : that will be specially handled.
     requestType match {
       case AllocationType.HOST => {
-        assert (YarnAllocationHandler.ANY_HOST != resource)
-
+        assert(YarnAllocationHandler.ANY_HOST != resource)
         val hostname = resource
         val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
 
-        // add to host->rack mapping
+        // Add to host->rack mapping
         YarnAllocationHandler.populateRackInfo(conf, hostname)
 
         nodeLocal
       }
-
       case AllocationType.RACK => {
         val rack = resource
         createResourceRequestImpl(rack, numWorkers, priority)
       }
-
-      case AllocationType.ANY => {
-        createResourceRequestImpl(YarnAllocationHandler.ANY_HOST, numWorkers, priority)
-      }
-
-      case _ => throw new IllegalArgumentException("Unexpected/unsupported request type .. " + requestType)
+      case AllocationType.ANY => createResourceRequestImpl(
+        YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+      case _ => throw new IllegalArgumentException(
+        "Unexpected/unsupported request type: " + requestType)
     }
   }
 
-  private def createResourceRequestImpl(hostname:String, numWorkers: Int, priority: Int): ResourceRequest = {
+  private def createResourceRequestImpl(
+    hostname:String,
+    numWorkers: Int,
+    priority: Int): ResourceRequest = {
 
     val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
     val memCapability = Records.newRecord(classOf[Resource])
@@ -447,11 +478,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
   def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
 
     val retval = new ArrayBuffer[ContainerId](1)
-    // iterator on COW list ...
+    // Iterator on COW list ...
     for (container <- releasedContainerList.iterator()){
       retval += container
     }
-    // remove from the original list.
+    // Remove from the original list.
     if (! retval.isEmpty) {
       releasedContainerList.removeAll(retval)
       for (v <- retval) pendingReleaseContainers.put(v, true)
@@ -466,14 +497,14 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
 object YarnAllocationHandler {
 
   val ANY_HOST = "*"
-  // all requests are issued with same priority : we do not (yet) have any distinction between 
+  // All requests are issued with same priority : we do not (yet) have any distinction between 
   // request types (like map/reduce in hadoop for example)
   val PRIORITY = 1
 
   // Additional memory overhead - in mb
   val MEMORY_OVERHEAD = 384
 
-  // host to rack map - saved from allocation requests
+  // Host to rack map - saved from allocation requests
   // We are expecting this not to change.
   // Note that it is possible for this to change : and RM will indicate that to us via update 
   // response to allocate. But we are punting on handling that for now.
@@ -481,38 +512,69 @@ object YarnAllocationHandler {
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
 
 
-  def newAllocator(conf: Configuration,
-                   resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
-                   args: ApplicationMasterArguments): YarnAllocationHandler = {
-
-    new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
-      args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      args.numWorkers, 
+      args.workerMemory,
+      args.workerCores,
+      Map[String, Int](),
+      Map[String, Int]())
   }
 
-  def newAllocator(conf: Configuration,
-                   resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
-                   args: ApplicationMasterArguments,
-                   map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    map: collection.Map[String,
+    collection.Set[SplitInfo]]): YarnAllocationHandler = {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
-    new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
-      args.workerMemory, args.workerCores, hostToCount, rackToCount)
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      args.numWorkers, 
+      args.workerMemory,
+      args.workerCores,
+      hostToCount,
+      rackToCount)
   }
 
-  def newAllocator(conf: Configuration,
-                   resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
-                   maxWorkers: Int, workerMemory: Int, workerCores: Int,
-                   map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
+  def newAllocator(
+    conf: Configuration,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    maxWorkers: Int,
+    workerMemory: Int,
+    workerCores: Int,
+    map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
 
-    new YarnAllocationHandler(conf, resourceManager, appAttemptId, maxWorkers,
-      workerMemory, workerCores, hostToCount, rackToCount)
+    new YarnAllocationHandler(
+      conf,
+      resourceManager,
+      appAttemptId,
+      maxWorkers,
+      workerMemory,
+      workerCores,
+      hostToCount,
+      rackToCount)
   }
 
   // A simple method to copy the split info map.
-  private def generateNodeToWeight(conf: Configuration, input: collection.Map[String, collection.Set[SplitInfo]]) :
+  private def generateNodeToWeight(
+    conf: Configuration,
+    input: collection.Map[String, collection.Set[SplitInfo]]) :
   // host to count, rack to count
   (Map[String, Int], Map[String, Int]) = {
 
@@ -536,7 +598,7 @@ object YarnAllocationHandler {
   }
 
   def lookupRack(conf: Configuration, host: String): String = {
-    if (! hostToRack.contains(host)) populateRackInfo(conf, host)
+    if (!hostToRack.contains(host)) populateRackInfo(conf, host)
     hostToRack.get(host)
   }
 
@@ -559,10 +621,12 @@ object YarnAllocationHandler {
         val rack = rackInfo.getNetworkLocation
         hostToRack.put(hostname, rack)
         if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack, Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+          rackToHostSet.putIfAbsent(rack,
+            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
         }
         rackToHostSet.get(rack).add(hostname)
 
+        // TODO(harvey): Figure out this comment...
         // Since RackResolver caches, we are disabling this for now ...
       } /* else {
         // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...


Mime
View raw message