spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgravescs <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-3477] Clean up code in Yarn Client / Cl...
Date Thu, 11 Sep 2014 16:28:50 GMT
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17431362
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -40,151 +39,102 @@ import org.apache.hadoop.yarn.util.Records
     import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
     
     /**
    - * The entry point (starting in Client#main() and Client#run()) for launching Spark on
YARN. The
    - * Client submits an application to the YARN ResourceManager.
    + * The entry point (starting in Client#main() and Client#run()) for launching Spark on
YARN.
    + * The Client submits an application to the YARN ResourceManager.
      */
    -trait ClientBase extends Logging {
    -  val args: ClientArguments
    -  val conf: Configuration
    -  val sparkConf: SparkConf
    -  val yarnConf: YarnConfiguration
    -  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    -  private val SPARK_STAGING: String = ".sparkStaging"
    +private[spark] trait ClientBase extends Logging {
    +  import ClientBase._
    +
    +  protected val args: ClientArguments
    +  protected val hadoopConf: Configuration
    +  protected val sparkConf: SparkConf
    +  protected val yarnConf: YarnConfiguration
    +  protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
    +  protected val amMemoryOverhead = args.amMemoryOverhead // MB
    +  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
       private val distCacheMgr = new ClientDistributedCacheManager()
     
    -  // Staging directory is private! -> rwx--------
    -  val STAGING_DIR_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
    -  // App files are world-wide readable and owner writable -> rw-r--r--
    -  val APP_FILE_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
    -
    -  // Additional memory overhead - in mb.
    -  protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
    -    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    -
    -  // TODO(harvey): This could just go in ClientArguments.
    -  def validateArgs() = {
    -    Map(
    -      (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
    -      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
    -        "greater than: " + memoryOverhead),
    -      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size"
+
    -        "must be greater than: " + memoryOverhead.toString)
    -    ).foreach { case(cond, errStr) =>
    -      if (cond) {
    -        logError(errStr)
    -        throw new IllegalArgumentException(args.getUsageMessage())
    -      }
    -    }
    -  }
    -
    -  def getAppStagingDir(appId: ApplicationId): String = {
    -    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
    -  }
    -
    -  def verifyClusterResources(app: GetNewApplicationResponse) = {
    -    val maxMem = app.getMaximumResourceCapability().getMemory()
    -    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
    -
    -    // If we have requested more then the clusters max for a single resource then exit.
    -    if (args.executorMemory > maxMem) {
    -      val errorMessage =
    -        "Required executor memory (%d MB), is above the max threshold (%d MB) of this
cluster."
    -          .format(args.executorMemory, maxMem)
    -
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    -    }
    -    val amMem = args.amMemory + memoryOverhead
    +  /**
    +   * Fail fast if we have requested more resources per container than is available in
the cluster.
    +   */
    +  protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit
= {
    +    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    +    logInfo("Verifying our application has not requested more than the maximum " +
    +      s"memory capability of the cluster ($maxMem MB per container)")
    +    val executorMem = args.executorMemory + executorMemoryOverhead
    +    if (executorMem > maxMem) {
    +      throw new IllegalArgumentException(s"Required executor memory ($executorMem MB)
" +
    --- End diff --
    
    yes I would assume so but please verify by testing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message