spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [5/6] spark git commit: SPARK-4338. [YARN] Ditch yarn-alpha.
Date Tue, 09 Dec 2014 19:02:55 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
deleted file mode 100644
index f95d723..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ /dev/null
@@ -1,823 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer, Map}
-import scala.util.{Try, Success, Failure}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.util.StringUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
-import org.apache.spark.util.Utils
-
-/**
- * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
- * The Client submits an application to the YARN ResourceManager.
- */
-private[spark] trait ClientBase extends Logging {
-  import ClientBase._
-
-  protected val args: ClientArguments
-  protected val hadoopConf: Configuration
-  protected val sparkConf: SparkConf
-  protected val yarnConf: YarnConfiguration
-  protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
-  protected val amMemoryOverhead = args.amMemoryOverhead // MB
-  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
-  private val distCacheMgr = new ClientDistributedCacheManager()
-  private val isLaunchingDriver = args.userClass != null
-
-  /**
-   * Fail fast if we have requested more resources per container than is available in the cluster.
-   */
-  protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
-    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
-    logInfo("Verifying our application has not requested more than the maximum " +
-      s"memory capability of the cluster ($maxMem MB per container)")
-    val executorMem = args.executorMemory + executorMemoryOverhead
-    if (executorMem > maxMem) {
-      throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
-        s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
-    }
-    val amMem = args.amMemory + amMemoryOverhead
-    if (amMem > maxMem) {
-      throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
-        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
-    }
-    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
-      amMem,
-      amMemoryOverhead))
-
-    // We could add checks to make sure the entire cluster has enough resources but that involves
-    // getting all the node reports and computing ourselves.
-  }
-
-  /**
-   * Copy the given file to a remote file system (e.g. HDFS) if needed.
-   * The file is only copied if the source and destination file systems are different. This is used
-   * for preparing resources for launching the ApplicationMaster container. Exposed for testing.
-   */
-  def copyFileToRemote(
-      destDir: Path,
-      srcPath: Path,
-      replication: Short,
-      setPerms: Boolean = false): Path = {
-    val destFs = destDir.getFileSystem(hadoopConf)
-    val srcFs = srcPath.getFileSystem(hadoopConf)
-    var destPath = srcPath
-    if (!compareFs(srcFs, destFs)) {
-      destPath = new Path(destDir, srcPath.getName())
-      logInfo(s"Uploading resource $srcPath -> $destPath")
-      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
-      destFs.setReplication(destPath, replication)
-      if (setPerms) {
-        destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
-      }
-    } else {
-      logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
-    }
-    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
-    // version shows the specific version in the distributed cache configuration
-    val qualifiedDestPath = destFs.makeQualified(destPath)
-    val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
-    fc.resolvePath(qualifiedDestPath)
-  }
-
-  /**
-   * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
-   * This is used for preparing local resources to be included in the container launch context.
-   */
-  private def getQualifiedLocalPath(localURI: URI): Path = {
-    val qualifiedURI =
-      if (localURI.getScheme == null) {
-        // If not specified, assume this is in the local filesystem to keep the behavior
-        // consistent with that of Hadoop
-        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString)
-      } else {
-        localURI
-      }
-    new Path(qualifiedURI)
-  }
-
-  /**
-   * Upload any resources to the distributed cache if needed. If a resource is intended to be
-   * consumed locally, set up the appropriate config for downstream code to handle it properly.
-   * This is used for setting up a container launch context for our ApplicationMaster.
-   * Exposed for testing.
-   */
-  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
-    logInfo("Preparing resources for our AM container")
-    // Upload Spark and the application JAR to the remote file system if necessary,
-    // and add them as local resources to the application master.
-    val fs = FileSystem.get(hadoopConf)
-    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val nns = getNameNodesToAccess(sparkConf) + dst
-    obtainTokensForNamenodes(nns, hadoopConf, credentials)
-
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
-      fs.getDefaultReplication(dst)).toShort
-    val localResources = HashMap[String, LocalResource]()
-    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
-
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-
-    val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
-    if (oldLog4jConf.isDefined) {
-      logWarning(
-        "SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
-        "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
-        "for alternatives.")
-    }
-
-    /**
-     * Copy the given main resource to the distributed cache if the scheme is not "local".
-     * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
-     * Each resource is represented by a 4-tuple of:
-     *   (1) destination resource name,
-     *   (2) local path to the resource,
-     *   (3) Spark property key to set if the scheme is not local, and
-     *   (4) whether to set permissions for this resource
-     */
-    List(
-      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
-      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
-      ("log4j.properties", oldLog4jConf.orNull, null, false)
-    ).foreach { case (destName, _localPath, confKey, setPermissions) =>
-      val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (!localPath.isEmpty()) {
-        val localURI = new URI(localPath)
-        if (localURI.getScheme != LOCAL_SCHEME) {
-          val src = getQualifiedLocalPath(localURI)
-          val destPath = copyFileToRemote(dst, src, replication, setPermissions)
-          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
-          distCacheMgr.addResource(destFs, hadoopConf, destPath,
-            localResources, LocalResourceType.FILE, destName, statCache)
-        } else if (confKey != null) {
-          // If the resource is intended for local use only, handle this downstream
-          // by setting the appropriate property
-          sparkConf.set(confKey, localPath)
-        }
-      }
-    }
-
-    /**
-     * Do the same for any additional resources passed in through ClientArguments.
-     * Each resource category is represented by a 3-tuple of:
-     *   (1) comma separated list of resources in this category,
-     *   (2) resource type, and
-     *   (3) whether to add these resources to the classpath
-     */
-    val cachedSecondaryJarLinks = ListBuffer.empty[String]
-    List(
-      (args.addJars, LocalResourceType.FILE, true),
-      (args.files, LocalResourceType.FILE, false),
-      (args.archives, LocalResourceType.ARCHIVE, false)
-    ).foreach { case (flist, resType, addToClasspath) =>
-      if (flist != null && !flist.isEmpty()) {
-        flist.split(',').foreach { file =>
-          val localURI = new URI(file.trim())
-          if (localURI.getScheme != LOCAL_SCHEME) {
-            val localPath = new Path(localURI)
-            val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-            val destPath = copyFileToRemote(dst, localPath, replication)
-            distCacheMgr.addResource(
-              fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
-            if (addToClasspath) {
-              cachedSecondaryJarLinks += linkname
-            }
-          } else if (addToClasspath) {
-            // Resource is intended for local use only and should be added to the class path
-            cachedSecondaryJarLinks += file.trim()
-          }
-        }
-      }
-    }
-    if (cachedSecondaryJarLinks.nonEmpty) {
-      sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
-    }
-
-    localResources
-  }
-
-  /**
-   * Set up the environment for launching our ApplicationMaster container.
-   */
-  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
-    logInfo("Setting up the launch environment for our AM container")
-    val env = new HashMap[String, String]()
-    val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
-    populateClasspath(args, yarnConf, sparkConf, env, extraCp)
-    env("SPARK_YARN_MODE") = "true"
-    env("SPARK_YARN_STAGING_DIR") = stagingDir
-    env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
-
-    // Set the environment variables to be passed on to the executors.
-    distCacheMgr.setDistFilesEnv(env)
-    distCacheMgr.setDistArchivesEnv(env)
-
-    // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
-    val amEnvPrefix = "spark.yarn.appMasterEnv."
-    sparkConf.getAll
-      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
-      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
-      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
-
-    // Keep this for backwards compatibility but users should move to the config
-    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
-      // Allow users to specify some environment variables.
-      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
-      // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
-      env("SPARK_YARN_USER_ENV") = userEnvs
-    }
-
-    // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
-    // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
-    // SparkContext will not let that set spark* system properties, which is expected behavior for
-    // Yarn clients. So propagate it through the environment.
-    //
-    // Note that to warn the user about the deprecation in cluster mode, some code from
-    // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
-    // described above).
-    if (isLaunchingDriver) {
-      sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
-        val warning =
-          s"""
-            |SPARK_JAVA_OPTS was detected (set to '$value').
-            |This is deprecated in Spark 1.0+.
-            |
-            |Please instead use:
-            | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
-            | - ./spark-submit with --driver-java-options to set -X options for a driver
-            | - spark.executor.extraJavaOptions to set -X options for executors
-          """.stripMargin
-        logWarning(warning)
-        for (proc <- Seq("driver", "executor")) {
-          val key = s"spark.$proc.extraJavaOptions"
-          if (sparkConf.contains(key)) {
-            throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
-          }
-        }
-        env("SPARK_JAVA_OPTS") = value
-      }
-    }
-
-    env
-  }
-
-  /**
-   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
-   * This sets up the launch environment, java options, and the command for launching the AM.
-   */
-  protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
-      : ContainerLaunchContext = {
-    logInfo("Setting up container launch context for our AM")
-
-    val appId = newAppResponse.getApplicationId
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val launchEnv = setupLaunchEnv(appStagingDir)
-    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
-    amContainer.setLocalResources(localResources)
-    amContainer.setEnvironment(launchEnv)
-
-    val javaOpts = ListBuffer[String]()
-
-    // Set the environment variable through a command prefix
-    // to append to the existing value of the variable
-    var prefixEnv: Option[String] = None
-
-    // Add Xmx for AM memory
-    javaOpts += "-Xmx" + args.amMemory + "m"
-
-    val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    javaOpts += "-Djava.io.tmpdir=" + tmpDir
-
-    // TODO: Remove once cpuset version is pushed out.
-    // The context is, default gc for server class machines ends up using all cores to do gc -
-    // hence if there are multiple containers in same node, Spark GC affects all other containers'
-    // performance (which can be that of other Spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
-    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
-    // of cores on a node.
-    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
-    if (useConcurrentAndIncrementalGC) {
-      // In our expts, using (default) throughput collector has severe perf ramifications in
-      // multi-tenant machines
-      javaOpts += "-XX:+UseConcMarkSweepGC"
-      javaOpts += "-XX:+CMSIncrementalMode"
-      javaOpts += "-XX:+CMSIncrementalPacing"
-      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
-      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
-    }
-
-    // Forward the Spark configuration to the application master / executors.
-    // TODO: it might be nicer to pass these as an internal environment variable rather than
-    // as Java options, due to complications with string parsing of nested quotes.
-    for ((k, v) <- sparkConf.getAll) {
-      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
-    }
-
-    // Include driver-specific java options if we are launching a driver
-    if (isLaunchingDriver) {
-      sparkConf.getOption("spark.driver.extraJavaOptions")
-        .orElse(sys.env.get("SPARK_JAVA_OPTS"))
-        .foreach(opts => javaOpts += opts)
-      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
-        sys.props.get("spark.driver.libraryPath")).flatten
-      if (libraryPaths.nonEmpty) {
-        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
-      }
-    }
-
-    // For log4j configuration to reference
-    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
-
-    val userClass =
-      if (isLaunchingDriver) {
-        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
-      } else {
-        Nil
-      }
-    val userJar =
-      if (args.userJar != null) {
-        Seq("--jar", args.userJar)
-      } else {
-        Nil
-      }
-    val amClass =
-      if (isLaunchingDriver) {
-        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
-      } else {
-        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
-      }
-    val userArgs = args.userArgs.flatMap { arg =>
-      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
-    }
-    val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
-      Seq(
-        "--executor-memory", args.executorMemory.toString + "m",
-        "--executor-cores", args.executorCores.toString,
-        "--num-executors ", args.numExecutors.toString)
-
-    // Command for the ApplicationMaster
-    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
-      javaOpts ++ amArgs ++
-      Seq(
-        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
-        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    // TODO: it would be nicer to just make sure there are no null commands here
-    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
-    amContainer.setCommands(printableCommands)
-
-    logDebug("===============================================================================")
-    logDebug("Yarn AM launch context:")
-    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
-    logDebug("    env:")
-    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
-    logDebug("    resources:")
-    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
-    logDebug("    command:")
-    logDebug(s"        ${printableCommands.mkString(" ")}")
-    logDebug("===============================================================================")
-
-    // send the acl settings into YARN to control who has access via YARN interfaces
-    val securityManager = new SecurityManager(sparkConf)
-    amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
-    setupSecurityToken(amContainer)
-    UserGroupInformation.getCurrentUser().addCredentials(credentials)
-
-    amContainer
-  }
-
-  /**
-   * Report the state of an application until it has exited, either successfully or
-   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
-   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
-   * or KILLED).
-   *
-   * @param appId ID of the application to monitor.
-   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
-   * @param logApplicationReport Whether to log details of the application report every iteration.
-   * @return A pair of the yarn application state and the final application state.
-   */
-  def monitorApplication(
-      appId: ApplicationId,
-      returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-    var lastState: YarnApplicationState = null
-    while (true) {
-      Thread.sleep(interval)
-      val report = getApplicationReport(appId)
-      val state = report.getYarnApplicationState
-
-      if (logApplicationReport) {
-        logInfo(s"Application report for $appId (state: $state)")
-        val details = Seq[(String, String)](
-          ("client token", getClientToken(report)),
-          ("diagnostics", report.getDiagnostics),
-          ("ApplicationMaster host", report.getHost),
-          ("ApplicationMaster RPC port", report.getRpcPort.toString),
-          ("queue", report.getQueue),
-          ("start time", report.getStartTime.toString),
-          ("final status", report.getFinalApplicationStatus.toString),
-          ("tracking URL", report.getTrackingUrl),
-          ("user", report.getUser)
-        )
-
-        // Use more loggable format if value is null or empty
-        val formattedDetails = details
-          .map { case (k, v) =>
-            val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
-            s"\n\t $k: $newValue" }
-          .mkString("")
-
-        // If DEBUG is enabled, log report details every iteration
-        // Otherwise, log them every time the application changes state
-        if (log.isDebugEnabled) {
-          logDebug(formattedDetails)
-        } else if (lastState != state) {
-          logInfo(formattedDetails)
-        }
-      }
-
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return (state, report.getFinalApplicationStatus)
-      }
-
-      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
-        return (state, report.getFinalApplicationStatus)
-      }
-
-      lastState = state
-    }
-
-    // Never reached, but keeps compiler happy
-    throw new SparkException("While loop is depleted! This should never happen...")
-  }
-
-  /**
-   * Submit an application to the ResourceManager and monitor its state.
-   * This continues until the application has exited for any reason.
-   * If the application finishes with a failed, killed, or undefined status,
-   * throw an appropriate SparkException.
-   */
-  def run(): Unit = {
-    val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
-    if (yarnApplicationState == YarnApplicationState.FAILED ||
-      finalApplicationStatus == FinalApplicationStatus.FAILED) {
-      throw new SparkException("Application finished with failed status")
-    }
-    if (yarnApplicationState == YarnApplicationState.KILLED ||
-      finalApplicationStatus == FinalApplicationStatus.KILLED) {
-      throw new SparkException("Application is killed")
-    }
-    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
-      throw new SparkException("The final status of application is undefined")
-    }
-  }
-
-  /* --------------------------------------------------------------------------------------- *
-   |  Methods that cannot be implemented here due to API differences across hadoop versions  |
-   * --------------------------------------------------------------------------------------- */
-
-  /** Submit an application running our ApplicationMaster to the ResourceManager. */
-  def submitApplication(): ApplicationId
-
-  /** Set up security tokens for launching our ApplicationMaster container. */
-  protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit
-
-  /** Get the application report from the ResourceManager for an application we have submitted. */
-  protected def getApplicationReport(appId: ApplicationId): ApplicationReport
-
-  /**
-   * Return the security token used by this client to communicate with the ApplicationMaster.
-   * If no security is enabled, the token returned by the report is null.
-   */
-  protected def getClientToken(report: ApplicationReport): String
-}
-
-private[spark] object ClientBase extends Logging {
-
-  // Alias for the Spark assembly jar and the user jar
-  val SPARK_JAR: String = "__spark__.jar"
-  val APP_JAR: String = "__app__.jar"
-
-  // URI scheme that identifies local resources
-  val LOCAL_SCHEME = "local"
-
-  // Staging directory for any temporary jars or files
-  val SPARK_STAGING: String = ".sparkStaging"
-
-  // Location of any user-defined Spark jars
-  val CONF_SPARK_JAR = "spark.yarn.jar"
-  val ENV_SPARK_JAR = "SPARK_JAR"
-
-  // Internal config to propagate the location of the user's jar to the driver/executors
-  val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
-
-  // Internal config to propagate the locations of any extra jars to add to the classpath
-  // of the executors
-  val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
-
-  // Staging directory is private! -> rwx--------
-  val STAGING_DIR_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
-
-  // App files are world-wide readable and owner writable -> rw-r--r--
-  val APP_FILE_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
-
-  /**
-   * Find the user-defined Spark jar if configured, or return the jar containing this
-   * class if not.
-   *
-   * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
-   * user environment if that is not found (for backwards compatibility).
-   */
-  private def sparkJar(conf: SparkConf): String = {
-    if (conf.contains(CONF_SPARK_JAR)) {
-      conf.get(CONF_SPARK_JAR)
-    } else if (System.getenv(ENV_SPARK_JAR) != null) {
-      logWarning(
-        s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
-        s"in favor of the $CONF_SPARK_JAR configuration variable.")
-      System.getenv(ENV_SPARK_JAR)
-    } else {
-      SparkContext.jarOfClass(this.getClass).head
-    }
-  }
-
-  /**
-   * Return the path to the given application's staging directory.
-   */
-  private def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
-  }
-
-  /**
-   * Populate the classpath entry in the given environment map with any application
-   * classpath specified through the Hadoop and Yarn configurations.
-   */
-  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = {
-    val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
-    for (c <- classPathElementsToAdd.flatten) {
-      YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
-    }
-  }
-
-  private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
-    Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
-      case Some(s) => Some(s.toSeq)
-      case None => getDefaultYarnApplicationClasspath
-  }
-
-  private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
-    Option(conf.getStrings("mapreduce.application.classpath")) match {
-      case Some(s) => Some(s.toSeq)
-      case None => getDefaultMRApplicationClasspath
-    }
-
-  def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
-    val triedDefault = Try[Seq[String]] {
-      val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
-      val value = field.get(null).asInstanceOf[Array[String]]
-      value.toSeq
-    } recoverWith {
-      case e: NoSuchFieldException => Success(Seq.empty[String])
-    }
-
-    triedDefault match {
-      case f: Failure[_] =>
-        logError("Unable to obtain the default YARN Application classpath.", f.exception)
-      case s: Success[_] =>
-        logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
-    }
-
-    triedDefault.toOption
-  }
-
-  /**
-   * In Hadoop 0.23, the MR application classpath comes with the YARN application
-   * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
-   * So we need to use reflection to retrieve it.
-   */
-  def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
-    val triedDefault = Try[Seq[String]] {
-      val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
-      val value = if (field.getType == classOf[String]) {
-        StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray
-      } else {
-        field.get(null).asInstanceOf[Array[String]]
-      }
-      value.toSeq
-    } recoverWith {
-      case e: NoSuchFieldException => Success(Seq.empty[String])
-    }
-
-    triedDefault match {
-      case f: Failure[_] =>
-        logError("Unable to obtain the default MR Application classpath.", f.exception)
-      case s: Success[_] =>
-        logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
-    }
-
-    triedDefault.toOption
-  }
-
-  /**
-   * Populate the classpath entry in the given environment map.
-   * This includes the user jar, Spark jar, and any extra application jars.
-   */
-  def populateClasspath(
-      args: ClientArguments,
-      conf: Configuration,
-      sparkConf: SparkConf,
-      env: HashMap[String, String],
-      extraClassPath: Option[String] = None): Unit = {
-    extraClassPath.foreach(addClasspathEntry(_, env))
-    addClasspathEntry(Environment.PWD.$(), env)
-
-    // Normally the users app.jar is last in case conflicts with spark jars
-    if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
-      addUserClasspath(args, sparkConf, env)
-      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      populateHadoopClasspath(conf, env)
-    } else {
-      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      populateHadoopClasspath(conf, env)
-      addUserClasspath(args, sparkConf, env)
-    }
-
-    // Append all jar files under the working directory to the classpath.
-    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
-  }
-
-  /**
-   * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
-   * to the classpath.
-   */
-  private def addUserClasspath(
-      args: ClientArguments,
-      conf: SparkConf,
-      env: HashMap[String, String]): Unit = {
-
-    // If `args` is not null, we are launching an AM container.
-    // Otherwise, we are launching executor containers.
-    val (mainJar, secondaryJars) =
-      if (args != null) {
-        (args.userJar, args.addJars)
-      } else {
-        (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
-      }
-
-    addFileToClasspath(mainJar, APP_JAR, env)
-    if (secondaryJars != null) {
-      secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
-        addFileToClasspath(jar, null, env)
-      }
-    }
-  }
-
-  /**
-   * Adds the given path to the classpath, handling "local:" URIs correctly.
-   *
-   * If an alternate name for the file is given, and it's not a "local:" file, the alternate
-   * name will be added to the classpath (relative to the job's work directory).
-   *
-   * If not a "local:" file and no alternate name, the environment is not modified.
-   *
-   * @param path      Path to add to classpath (optional).
-   * @param fileName  Alternate name for the file (optional).
-   * @param env       Map holding the environment variables.
-   */
-  private def addFileToClasspath(
-      path: String,
-      fileName: String,
-      env: HashMap[String, String]): Unit = {
-    if (path != null) {
-      scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
-        val uri = new URI(path)
-        if (uri.getScheme == LOCAL_SCHEME) {
-          addClasspathEntry(uri.getPath, env)
-          return
-        }
-      }
-    }
-    if (fileName != null) {
-      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
-    }
-  }
-
-  /**
-   * Add the given path to the classpath entry of the given environment map.
-   * If the classpath is already set, this appends the new path to the existing classpath.
-   */
-  private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
-    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
-
-  /**
-   * Get the list of namenodes the user may access.
-   */
-  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get("spark.yarn.access.namenodes", "")
-      .split(",")
-      .map(_.trim())
-      .filter(!_.isEmpty)
-      .map(new Path(_))
-      .toSet
-  }
-
-  def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-    delegTokenRenewer
-  }
-
-  /**
-   * Obtains tokens for the namenodes passed in and adds them to the credentials.
-   */
-  def obtainTokensForNamenodes(
-      paths: Set[Path],
-      conf: Configuration,
-      creds: Credentials): Unit = {
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val delegTokenRenewer = getTokenRenewer(conf)
-      paths.foreach { dst =>
-        val dstFs = dst.getFileSystem(conf)
-        logDebug("getting token for namenode: " + dst)
-        dstFs.addDelegationTokens(delegTokenRenewer, creds)
-      }
-    }
-  }
-
-  /**
-   * Return whether the two file systems are the same.
-   */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
-    if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
-      return false
-    }
-
-    var srcHost = srcUri.getHost()
-    var dstHost = dstUri.getHost()
-
-    // In HA or when using viewfs, the host part of the URI may not actually be a host, but the
-    // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they
-    // match.
-    if (srcHost != null && dstHost != null && srcHost != dstHost) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
-      } catch {
-        case e: UnknownHostException =>
-          return false
-      }
-    }
-
-    Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
deleted file mode 100644
index c592ecf..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import org.apache.spark.Logging
-
-/** Client side methods to setup the Hadoop distributed cache */
-private[spark] class ClientDistributedCacheManager() extends Logging {
-
-  // Mappings from remote URI to (file status, modification time, visibility)
-  private val distCacheFiles: Map[String, (String, String, String)] =
-    LinkedHashMap[String, (String, String, String)]()
-  private val distCacheArchives: Map[String, (String, String, String)] =
-    LinkedHashMap[String, (String, String, String)]()
-
-
-  /**
-   * Add a resource to the list of distributed cache resources. This list can
-   * be sent to the ApplicationMaster and possibly the executors so that it can
-   * be downloaded into the Hadoop distributed cache for use by this application.
-   * Adds the LocalResource to the localResources HashMap passed in and saves 
-   * the stats of the resources to they can be sent to the executors and verified.
-   *
-   * @param fs FileSystem
-   * @param conf Configuration
-   * @param destPath path to the resource
-   * @param localResources localResource hashMap to insert the resource into
-   * @param resourceType LocalResourceType 
-   * @param link link presented in the distributed cache to the destination
-   * @param statCache cache to store the file/directory stats 
-   * @param appMasterOnly Whether to only add the resource to the app master
-   */
-  def addResource(
-      fs: FileSystem,
-      conf: Configuration,
-      destPath: Path, 
-      localResources: HashMap[String, LocalResource],
-      resourceType: LocalResourceType,
-      link: String,
-      statCache: Map[URI, FileStatus],
-      appMasterOnly: Boolean = false): Unit = {
-    val destStatus = fs.getFileStatus(destPath)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource])
-    amJarRsrc.setType(resourceType)
-    val visibility = getVisibility(conf, destPath.toUri(), statCache)
-    amJarRsrc.setVisibility(visibility)
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
-    amJarRsrc.setTimestamp(destStatus.getModificationTime())
-    amJarRsrc.setSize(destStatus.getLen())
-    if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
-    localResources(link) = amJarRsrc
-    
-    if (!appMasterOnly) {
-      val uri = destPath.toUri()
-      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
-      if (resourceType == LocalResourceType.FILE) {
-        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
-          destStatus.getModificationTime().toString(), visibility.name())
-      } else {
-        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
-          destStatus.getModificationTime().toString(), visibility.name())
-      }
-    }
-  }
-
-  /**
-   * Adds the necessary cache file env variables to the env passed in
-   */
-  def setDistFilesEnv(env: Map[String, String]): Unit = {
-    val (keys, tupleValues) = distCacheFiles.unzip
-    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-    if (keys.size > 0) {
-      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
-        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
-        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
-        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
-    }
-  }
-
-  /**
-   * Adds the necessary cache archive env variables to the env passed in
-   */
-  def setDistArchivesEnv(env: Map[String, String]): Unit = {
-    val (keys, tupleValues) = distCacheArchives.unzip
-    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-    if (keys.size > 0) {
-      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
-        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
-        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
-      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
-        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
-    }
-  }
-
-  /**
-   * Returns the local resource visibility depending on the cache file permissions
-   * @return LocalResourceVisibility
-   */
-  def getVisibility(
-      conf: Configuration,
-      uri: URI,
-      statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
-    if (isPublic(conf, uri, statCache)) {
-      LocalResourceVisibility.PUBLIC
-    } else {
-      LocalResourceVisibility.PRIVATE
-    }
-  }
-
-  /**
-   * Returns a boolean to denote whether a cache file is visible to all (public)
-   * @return true if the path in the uri is visible to all, false otherwise
-   */
-  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
-    val fs = FileSystem.get(uri, conf)
-    val current = new Path(uri.getPath())
-    // the leaf level file should be readable by others
-    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
-      return false
-    }
-    ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
-  }
-
-  /**
-   * Returns true if all ancestors of the specified path have the 'execute'
-   * permission set for all users (i.e. that other users can traverse
-   * the directory hierarchy to the given path)
-   * @return true if all ancestors have the 'execute' permission set for all users
-   */
-  def ancestorsHaveExecutePermissions(
-      fs: FileSystem,
-      path: Path,
-      statCache: Map[URI, FileStatus]): Boolean =  {
-    var current = path
-    while (current != null) {
-      // the subdirs in the path should have execute permissions for others
-      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
-        return false
-      }
-      current = current.getParent()
-    }
-    true
-  }
-
-  /**
-   * Checks for a given path whether the Other permissions on it
-   * imply the permission in the passed FsAction
-   * @return true if the path in the uri is visible to all, false otherwise
-   */
-  def checkPermissionOfOther(
-      fs: FileSystem,
-      path: Path,
-      action: FsAction,
-      statCache: Map[URI, FileStatus]): Boolean = {
-    val status = getFileStatus(fs, path.toUri(), statCache)
-    val perms = status.getPermission()
-    val otherAction = perms.getOtherAction()
-    otherAction.implies(action)
-  }
-
-  /**
-   * Checks to see if the given uri exists in the cache, if it does it
-   * returns the existing FileStatus, otherwise it stats the uri, stores
-   * it in the cache, and returns the FileStatus.
-   * @return FileStatus
-   */
-  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
-    val stat = statCache.get(uri) match {
-      case Some(existstat) => existstat
-      case None => 
-        val newStat = fs.getFileStatus(new Path(uri))
-        statCache.put(uri, newStat)
-        newStat
-    }
-    stat
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
deleted file mode 100644
index 88dad0f..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-
-trait ExecutorRunnableUtil extends Logging {
-
-  val yarnConf: YarnConfiguration
-  val sparkConf: SparkConf
-  lazy val env = prepareEnvironment
-
-  def prepareCommand(
-      masterAddress: String,
-      slaveId: String,
-      hostname: String,
-      executorMemory: Int,
-      executorCores: Int,
-      appId: String,
-      localResources: HashMap[String, LocalResource]): List[String] = {
-    // Extra options for the JVM
-    val javaOpts = ListBuffer[String]()
-
-    // Set the environment variable through a command prefix
-    // to append to the existing value of the variable
-    var prefixEnv: Option[String] = None
-
-    // Set the JVM memory
-    val executorMemoryString = executorMemory + "m"
-    javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
-
-    // Set extra Java options for the executor, if defined
-    sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
-      javaOpts += opts
-    }
-    sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
-      javaOpts += opts
-    }
-    sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
-      prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
-    }
-
-    javaOpts += "-Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-
-    // Certain configs need to be passed here because they are needed before the Executor
-    // registers with the Scheduler and transfers the spark configs. Since the Executor backend
-    // uses Akka to connect to the scheduler, the akka settings are needed as well as the
-    // authentication settings.
-    sparkConf.getAll.
-      filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
-      foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
-    sparkConf.getAkkaConf.
-      foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
-    // Commenting it out for now - so that people can refer to the properties if required. Remove
-    // it once cpuset version is pushed out.
-    // The context is, default gc for server class machines end up using all cores to do gc - hence
-    // if there are multiple containers in same node, spark gc effects all other containers
-    // performance (which can also be other spark containers)
-    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
-    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
-    // of cores on a node.
-    /*
-        else {
-          // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-          // It might be possible that other modes/config is being done in spark.executor.extraJavaOptions,
-          // so we dont want to mess with it.
-          // In our expts, using (default) throughput collector has severe perf ramnifications in
-          // multi-tennent machines
-          // The options are based on
-          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-          javaOpts += " -XX:+UseConcMarkSweepGC "
-          javaOpts += " -XX:+CMSIncrementalMode "
-          javaOpts += " -XX:+CMSIncrementalPacing "
-          javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
-          javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
-        }
-    */
-
-    // For log4j configuration to reference
-    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
-
-    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
-      "-server",
-      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
-      // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
-      // an inconsistent state.
-      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
-      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      "-XX:OnOutOfMemoryError='kill %p'") ++
-      javaOpts ++
-      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
-      masterAddress.toString,
-      slaveId.toString,
-      hostname.toString,
-      executorCores.toString,
-      appId,
-      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
-      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    // TODO: it would be nicer to just make sure there are no null commands here
-    commands.map(s => if (s == null) "null" else s).toList
-  }
-
-  private def setupDistributedCache(
-      file: String,
-      rtype: LocalResourceType,
-      localResources: HashMap[String, LocalResource],
-      timestamp: String,
-      size: String,
-      vis: String): Unit = {
-    val uri = new URI(file)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource])
-    amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
-    amJarRsrc.setTimestamp(timestamp.toLong)
-    amJarRsrc.setSize(size.toLong)
-    localResources(uri.getFragment()) = amJarRsrc
-  }
-
-  def prepareLocalResources: HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    val localResources = HashMap[String, LocalResource]()
-
-    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
-      for( i <- 0 to distFiles.length - 1) {
-        setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i), visibilities(i))
-      }
-    }
-
-    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
-      val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
-      val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
-      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
-      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
-      for( i <- 0 to distArchives.length - 1) {
-        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
-          timeStamps(i), fileSizes(i), visibilities(i))
-      }
-    }
-
-    logInfo("Prepared Local resources " + localResources)
-    localResources
-  }
-
-  def prepareEnvironment: HashMap[String, String] = {
-    val env = new HashMap[String, String]()
-    val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
-    ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
-
-    sparkConf.getExecutorEnv.foreach { case (key, value) =>
-      // This assumes each executor environment variable set here is a path
-      // This is kept for backward compatibility and consistency with hadoop
-      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
-    }
-
-    // Keep this for backwards compatibility but users should move to the config
-    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
-      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
-    }
-
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
-    env
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
deleted file mode 100644
index b32e157..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{List => JList}
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.regex.Pattern
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
-/**
- * Common code for the Yarn container allocator. Contains all the version-agnostic code to
- * manage container allocation for a running Spark application.
- */
-private[yarn] abstract class YarnAllocator(
-    conf: Configuration,
-    sparkConf: SparkConf,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
-    securityMgr: SecurityManager)
-  extends Logging {
-
-  import YarnAllocator._
-
-  // These three are locked on allocatedHostToContainersMap. Complementary data structures
-  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
-
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
-
-  // Containers to be released in next request to RM
-  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  // Number of container requests that have been sent to, but not yet allocated by the
-  // ApplicationMaster.
-  private val numPendingAllocate = new AtomicInteger()
-  private val numExecutorsRunning = new AtomicInteger()
-  // Used to generate a unique id per executor
-  private val executorIdCounter = new AtomicInteger()
-  private val numExecutorsFailed = new AtomicInteger()
-
-  private var maxExecutors = args.numExecutors
-
-  // Keep track of which container is running which executor to remove the executors later
-  private val executorIdToContainer = new HashMap[String, Container]
-
-  protected val executorMemory = args.executorMemory
-  protected val executorCores = args.executorCores
-  protected val (preferredHostToCount, preferredRackToCount) =
-    generateNodeToWeight(conf, preferredNodes)
-
-  // Additional memory overhead - in mb.
-  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
-
-  private val launcherPool = new ThreadPoolExecutor(
-    // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
-    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
-    1, TimeUnit.MINUTES,
-    new LinkedBlockingQueue[Runnable](),
-    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
-  launcherPool.allowCoreThreadTimeOut(true)
-
-  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
-
-  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-
-  /**
-   * Request as many executors from the ResourceManager as needed to reach the desired total.
-   * This takes into account executors already running or pending.
-   */
-  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
-    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
-    if (requestedTotal > currentTotal) {
-      maxExecutors += (requestedTotal - currentTotal)
-      // We need to call `allocateResources` here to avoid the following race condition:
-      // If we request executors twice before `allocateResources` is called, then we will end up
-      // double counting the number requested because `numPendingAllocate` is not updated yet.
-      allocateResources()
-    } else {
-      logInfo(s"Not allocating more executors because there are already $currentTotal " +
-        s"(application requested $requestedTotal total)")
-    }
-  }
-
-  /**
-   * Request that the ResourceManager release the container running the specified executor.
-   */
-  def killExecutor(executorId: String): Unit = synchronized {
-    if (executorIdToContainer.contains(executorId)) {
-      val container = executorIdToContainer.remove(executorId).get
-      internalReleaseContainer(container)
-      numExecutorsRunning.decrementAndGet()
-      maxExecutors -= 1
-      assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
-    } else {
-      logWarning(s"Attempted to kill unknown executor $executorId!")
-    }
-  }
-
-  /**
-   * Allocate missing containers based on the number of executors currently pending and running.
-   *
-   * This method prioritizes the allocated container responses from the RM based on node and
-   * rack locality. Additionally, it releases any extra containers allocated for this application
-   * but are not needed. This must be synchronized because variables read in this block are
-   * mutated by other methods.
-   */
-  def allocateResources(): Unit = synchronized {
-    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
-
-    // this is needed by alpha, do it here since we add numPending right after this
-    val executorsPending = numPendingAllocate.get()
-    if (missing > 0) {
-      val totalExecutorMemory = executorMemory + memoryOverhead
-      numPendingAllocate.addAndGet(missing)
-      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
-        s"memory including $memoryOverhead MB overhead")
-    } else {
-      logDebug("Empty allocation request ...")
-    }
-
-    val allocateResponse = allocateContainers(missing, executorsPending)
-    val allocatedContainers = allocateResponse.getAllocatedContainers()
-
-    if (allocatedContainers.size > 0) {
-      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
-      if (numPendingAllocateNow < 0) {
-        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
-      }
-
-      logDebug("""
-        Allocated containers: %d
-        Current executor count: %d
-        Containers released: %s
-        Cluster resources: %s
-        """.format(
-          allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers,
-          allocateResponse.getAvailableResources))
-
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (container <- allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // Add the accepted `container` to the host's list of already accepted,
-          // allocated containers
-          val host = container.getNodeId.getHost
-          val containersForHost = hostToContainers.getOrElseUpdate(host,
-            new ArrayBuffer[Container]())
-          containersForHost += container
-        } else {
-          // Release container, since it doesn't satisfy resource constraints.
-          internalReleaseContainer(container)
-        }
-      }
-
-       // Find the appropriate containers to use.
-      // TODO: Cleanup this group-by...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet) {
-        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
-        val remainingContainersOpt = hostToContainers.get(candidateHost)
-        assert(remainingContainersOpt.isDefined)
-        var remainingContainers = remainingContainersOpt.get
-
-        if (requiredHostCount >= remainingContainers.size) {
-          // Since we have <= required containers, add all remaining containers to
-          // `dataLocalContainers`.
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // There are no more free containers remaining.
-          remainingContainers = null
-        } else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split the list into two: one based on the data local container count,
-          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-          // containers.
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-
-          // Invariant: remainingContainers == remaining
-
-          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
-          // Add each container in `remaining` to list of containers to release. If we have an
-          // insufficient number of containers, then the next allocation cycle will reallocate
-          // (but won't treat it as data local).
-          // TODO(harvey): Rephrase this comment some more.
-          for (container <- remaining) internalReleaseContainer(container)
-          remainingContainers = null
-        }
-
-        // For rack local containers
-        if (remainingContainers != null) {
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-          if (rack != null) {
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
-              rackLocalContainers.getOrElse(rack, List()).size
-
-            if (requiredRackCount >= remainingContainers.size) {
-              // Add all remaining containers to to `dataLocalContainers`.
-              dataLocalContainers.put(rack, remainingContainers)
-              remainingContainers = null
-            } else if (requiredRackCount > 0) {
-              // Container list has more containers that we need for data locality.
-              // Split the list into two: one based on the data local container count,
-              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-              // containers.
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        if (remainingContainers != null) {
-          // Not all containers have been consumed - add them to the list of off-rack containers.
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through them in order:
-      // first host-local, then rack-local, and finally off-rack.
-      // Note that the list we create below tries to ensure that not all containers end up within
-      // a host if there is a sufficiently large number of hosts/containers.
-      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers.
-      for (container <- allocatedContainersToProcess) {
-        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
-        val executorHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        val executorMemoryOverhead = (executorMemory + memoryOverhead)
-        assert(container.getResource.getMemory >= executorMemoryOverhead)
-
-        if (numExecutorsRunningNow > maxExecutors) {
-          logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, executorHostname))
-          internalReleaseContainer(container)
-          numExecutorsRunning.decrementAndGet()
-        } else {
-          val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-            SparkEnv.driverActorSystemName,
-            sparkConf.get("spark.driver.host"),
-            sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
-          executorIdToContainer(executorId) = container
-
-          // To be safe, remove the container from `releasedContainers`.
-          releasedContainers.remove(containerId)
-
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, executorHostname)
-
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
-            }
-          }
-          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
-            driverUrl, executorHostname))
-          val executorRunnable = new ExecutorRunnable(
-            container,
-            conf,
-            sparkConf,
-            driverUrl,
-            executorId,
-            executorHostname,
-            executorMemory,
-            executorCores,
-            appAttemptId.getApplicationId.toString,
-            securityMgr)
-          launcherPool.execute(executorRunnable)
-        }
-      }
-      logDebug("""
-        Finished allocating %s containers (from %s originally).
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          allocatedContainersToProcess,
-          allocatedContainers,
-          numExecutorsRunning.get(),
-          releasedContainers))
-    }
-
-    val completedContainers = allocateResponse.getCompletedContainersStatuses()
-    if (completedContainers.size > 0) {
-      logDebug("Completed %d containers".format(completedContainers.size))
-
-      for (completedContainer <- completedContainers) {
-        val containerId = completedContainer.getContainerId
-
-        if (releasedContainers.containsKey(containerId)) {
-          // YarnAllocationHandler already marked the container for release, so remove it from
-          // `releasedContainers`.
-          releasedContainers.remove(containerId)
-        } else {
-          // Decrement the number of executors running. The next iteration of
-          // the ApplicationMaster's reporting thread will take care of allocating.
-          numExecutorsRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              VMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              PMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus != 0) {
-            logInfo("Container marked as failed: " + containerId +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
-            numExecutorsFailed.incrementAndGet()
-          }
-        }
-
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val hostOpt = allocatedContainerToHostMap.get(containerId)
-            assert(hostOpt.isDefined)
-            val host = hostOpt.get
-
-            val containerSetOpt = allocatedHostToContainersMap.get(host)
-            assert(containerSetOpt.isDefined)
-            val containerSet = containerSetOpt.get
-
-            containerSet.remove(containerId)
-            if (containerSet.isEmpty) {
-              allocatedHostToContainersMap.remove(host)
-            } else {
-              allocatedHostToContainersMap.update(host, containerSet)
-            }
-
-            allocatedContainerToHostMap.remove(containerId)
-
-            // TODO: Move this part outside the synchronized block?
-            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) {
-                allocatedRackCount.put(rack, rackCount)
-              } else {
-                allocatedRackCount.remove(rack)
-              }
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          completedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers))
-    }
-  }
-
-  protected def allocatedContainersOnHost(host: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
-    }
-    retval
-  }
-
-  protected def allocatedContainersOnRack(rack: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedRackCount.getOrElse(rack, 0)
-    }
-    retval
-  }
-
-  private def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + memoryOverhead)
-  }
-
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-      conf: Configuration,
-      input: collection.Map[String, collection.Set[SplitInfo]]
-    ): (Map[String, Int], Map[String, Int]) = {
-
-    if (input == null) {
-      return (Map[String, Int](), Map[String, Int]())
-    }
-
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-      if (rack != null) {
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
-      }
-    }
-
-    (hostToCount.toMap, rackToCount.toMap)
-  }
-
-  private def internalReleaseContainer(container: Container) = {
-    releasedContainers.put(container.getId(), true)
-    releaseContainer(container)
-  }
-
-  /**
-   * Called to allocate containers in the cluster.
-   *
-   * @param count Number of containers to allocate.
-   *              If zero, should still contact RM (as a heartbeat).
-   * @param pending Number of containers pending allocate. Only used on alpha.
-   * @return Response to the allocation request.
-   */
-  protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
-
-  /** Called to release a previously allocated container. */
-  protected def releaseContainer(container: Container): Unit
-
-  /**
-   * Defines the interface for an allocate response from the RM. This is needed since the alpha
-   * and stable interfaces differ here in ways that cannot be fixed using other routes.
-   */
-  protected trait YarnAllocateResponse {
-
-    def getAllocatedContainers(): JList[Container]
-
-    def getAvailableResources(): Resource
-
-    def getCompletedContainersStatuses(): JList[ContainerStatus]
-
-  }
-
-}
-
-private object YarnAllocator {
-  val MEM_REGEX = "[0-9.]+ [KMG]B"
-  val PMEM_EXCEEDED_PATTERN =
-    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
-  val VMEM_EXCEEDED_PATTERN =
-    Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
-
-  def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
-    val matcher = pattern.matcher(diagnostics)
-    val diag = if (matcher.find()) " " + matcher.group() + "." else ""
-    ("Container killed by YARN for exceeding memory limits." + diag
-      + " Consider boosting spark.yarn.executor.memoryOverhead.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
deleted file mode 100644
index 2510b9c..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.{Map, Set}
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records._
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.scheduler.SplitInfo
-
-/**
- * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
- * is used by Spark's AM.
- */
-trait YarnRMClient {
-
-  /**
-   * Registers the application master with the RM.
-   *
-   * @param conf The Yarn configuration.
-   * @param sparkConf The Spark configuration.
-   * @param preferredNodeLocations Map with hints about where to allocate containers.
-   * @param uiAddress Address of the SparkUI.
-   * @param uiHistoryAddress Address of the application on the History Server.
-   */
-  def register(
-      conf: YarnConfiguration,
-      sparkConf: SparkConf,
-      preferredNodeLocations: Map[String, Set[SplitInfo]],
-      uiAddress: String,
-      uiHistoryAddress: String,
-      securityMgr: SecurityManager): YarnAllocator
-
-  /**
-   * Unregister the AM. Guaranteed to only be called once.
-   *
-   * @param status The final status of the AM.
-   * @param diagnostics Diagnostics message to include in the final status.
-   */
-  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
-
-  /** Returns the attempt ID. */
-  def getAttemptId(): ApplicationAttemptId
-
-  /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
-  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
-
-  /** Returns the maximum number of attempts to register the AM. */
-  def getMaxRegAttempts(conf: YarnConfiguration): Int
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message