spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tigerquoll <...@git.apache.org>
Subject [GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Date Sun, 28 Sep 2014 23:23:03 GMT
Github user tigerquoll commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2516#discussion_r18133582
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
    @@ -17,155 +17,195 @@
     
     package org.apache.spark.deploy
     
    -import java.io.{File, FileInputStream, IOException}
    -import java.util.Properties
    +import java.io.{InputStreamReader, File, FileInputStream, InputStream}
     import java.util.jar.JarFile
    +import java.util.Properties
     
    +import scala.collection._
    +import scala.collection.JavaConverters._
     import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, HashMap}
    +import org.apache.commons.lang3.CharEncoding
     
    -import org.apache.spark.SparkException
    +import org.apache.spark.deploy.ConfigConstants._
     import org.apache.spark.util.Utils
     
    +
    +
     /**
    - * Parses and encapsulates arguments from the spark-submit script.
    - */
    + * Pulls configuration information together in order of priority
    + *
    + * Entries in the conf Map will be filled in the following priority order
    + * 1. entries specified on the command line (except from --conf entries)
    + * 2. Entries specified on the command line with --conf
    + * 3. Environment variables (including legacy variable mappings)
    + * 4. System config variables (eg by using -Dspark.var.name)
    + * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if
either exist
    + * 6. hard coded defaults in class path at spark-submit-defaults.prop
    + *
    + * A property file specified by one of the means listed above gets read in and the properties
are
    + * considered to be at the priority of the method that specified the files. A property
specified in
    + * a property file will not override an existing config value at that same level
    +*/
     private[spark] class SparkSubmitArguments(args: Seq[String]) {
    -  var master: String = null
    -  var deployMode: String = null
    -  var executorMemory: String = null
    -  var executorCores: String = null
    -  var totalExecutorCores: String = null
    -  var propertiesFile: String = null
    -  var driverMemory: String = null
    -  var driverExtraClassPath: String = null
    -  var driverExtraLibraryPath: String = null
    -  var driverExtraJavaOptions: String = null
    -  var driverCores: String = null
    -  var supervise: Boolean = false
    -  var queue: String = null
    -  var numExecutors: String = null
    -  var files: String = null
    -  var archives: String = null
    -  var mainClass: String = null
    -  var primaryResource: String = null
    -  var name: String = null
    -  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
    -  var jars: String = null
    -  var verbose: Boolean = false
    -  var isPython: Boolean = false
    -  var pyFiles: String = null
    -  val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
    -
    -  /** Default properties present in the currently defined defaults file. */
    -  lazy val defaultSparkProperties: HashMap[String, String] = {
    -    val defaultProperties = new HashMap[String, String]()
    -    if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
    -    Option(propertiesFile).foreach { filename =>
    -      val file = new File(filename)
    -      SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
    -        if (k.startsWith("spark")) {
    -          defaultProperties(k) = v
    -          if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
    -        } else {
    -          SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
    -        }
    -      }
    -    }
    -    defaultProperties
    -  }
    +  /**
    +   * Stores all configuration items except for child arguments,
    +   * referenced by the constants defined in ConfigConstants.scala
    +   */
    +  val conf = new mutable.HashMap[String, String]()
    +
    +  def master  = conf(SparkMaster)
    +  def master_= (value: String):Unit = conf.put(SparkMaster, value)
    +
    +  def deployMode = conf(SparkDeployMode)
    +  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
    +
    +  def executorMemory = conf(SparkExecutorMemory)
    +  def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value)
    +
    +  def executorCores = conf(SparkExecutorCores)
    +  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value)
    +
    +  def totalExecutorCores = conf.get(SparkCoresMax)
    +  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value)
    +
    +  def driverMemory = conf(SparkDriverMemory)
    +  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value)
    +
    +  def driverExtraClassPath = conf.get(SparkDriverExtraClassPath)
    +  def driverExtraClassPath_= (value: String):Unit = conf.put(SparkDriverExtraClassPath,
value)
    +
    +  def driverExtraLibraryPath = conf.get(SparkDriverExtraLibraryPath)
    +  def driverExtraLibraryPath_= (value: String):Unit = conf.put(SparkDriverExtraLibraryPath,
value)
    +
    +  def driverExtraJavaOptions = conf.get(SparkDriverExtraJavaOptions)
    +  def driverExtraJavaOptions_= (value: String):Unit = conf.put(SparkDriverExtraJavaOptions,
value)
    +
    +  def driverCores = conf(SparkDriverCores)
    +  def driverCores_= (value: String):Unit = conf.put(SparkDriverCores, value)
    +
    +  def supervise = conf(SparkDriverSupervise) == true.toString
    +  def supervise_= (value: String):Unit = conf.put(SparkDriverSupervise, value)
    +
    +  def queue = conf(SparkYarnQueue)
    +  def queue_= (value: String):Unit = conf.put(SparkYarnQueue, value)
    +
    +  def numExecutors = conf(SparkExecutorInstances)
    +  def numExecutors_= (value: String):Unit = conf.put(SparkExecutorInstances, value)
    +
    +  def files = conf.get(SparkFiles)
    +  def files_= (value: String):Unit = conf.put(SparkFiles, value)
     
    -  // Respect SPARK_*_MEMORY for cluster mode
    -  driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
    -  executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
    +  def archives = conf.get(SparkYarnDistArchives)
    +  def archives_= (value: String):Unit = conf.put(SparkYarnDistArchives, value)
     
    -  parseOpts(args.toList)
    -  mergeSparkProperties()
    -  checkRequiredArguments()
    +  def mainClass = conf.get(SparkAppClass)
    +  def mainClass_= (value: String):Unit = conf.put(SparkAppClass, value)
     
    +  def primaryResource = conf.get(SparkAppPrimaryResource).get
    +  def primaryResource_= (value: String):Unit = conf.put(SparkAppPrimaryResource, value)
    +
    +  def name = conf.get(SparkAppName)
    +  def name_= (value: String):Unit = conf.put(SparkAppName, value)
    +
    +  def jars = conf.get(SparkJars)
    +  def jars_= (value: String):Unit = conf.put(SparkJars, value)
    +
    +  def pyFiles = conf.get(SparkSubmitPyFiles)
    +  def pyFiles_= (value: String):Unit = conf.put(SparkSubmitPyFiles, value)
    +
    +  lazy val verbose: Boolean =  SparkVerbose == true.toString
    +  lazy val isPython: Boolean = primaryResource != null &&
    +    SparkSubmit.isPython(primaryResource)
    +
    +  var childArgs = new mutable.ArrayBuffer[String]()
    +  
       /**
    -   * Fill in any undefined values based on the default properties file or options passed
in through
    -   * the '--conf' flag.
    +   * Used to store parameters parsed from command line (except for --conf and child arguments)
        */
    -  private def mergeSparkProperties(): Unit = {
    -    // Use common defaults file, if not specified by user
    -    if (propertiesFile == null) {
    -      sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
    -        val sep = File.separator
    -        val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
    -        val file = new File(defaultPath)
    -        if (file.exists()) {
    -          propertiesFile = file.getAbsolutePath
    -        }
    -      }
    -    }
    +  private val cmdLineConfig = new mutable.HashMap[String, String]()
     
    -    if (propertiesFile == null) {
    -      sys.env.get("SPARK_HOME").foreach { sparkHome =>
    -        val sep = File.separator
    -        val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
    -        val file = new File(defaultPath)
    -        if (file.exists()) {
    -          propertiesFile = file.getAbsolutePath
    -        }
    -      }
    -    }
    +  /**
    +   * arguments passed via --conf command line options
    +    */
    +  private val cmdLineConfConfig = new mutable.HashMap[String, String]()
    +
    +  try {
    +    parseOpts(args.toList)
    +
    +    // see comments at start of class definition detailing the location and priority
of configuration sources
    +    conf ++= SparkSubmitArguments.mergeSparkProperties(Seq(cmdLineConfig, cmdLineConfConfig))
     
    -    val properties = HashMap[String, String]()
    -    properties.putAll(defaultSparkProperties)
    -    properties.putAll(sparkProperties)
    -
    -    // Use properties file as fallback for values which have a direct analog to
    -    // arguments in this script.
    -    master = Option(master).getOrElse(properties.get("spark.master").orNull)
    -    executorMemory = Option(executorMemory)
    -      .getOrElse(properties.get("spark.executor.memory").orNull)
    -    executorCores = Option(executorCores)
    -      .getOrElse(properties.get("spark.executor.cores").orNull)
    -    totalExecutorCores = Option(totalExecutorCores)
    -      .getOrElse(properties.get("spark.cores.max").orNull)
    -    name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
    -    jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
    -
    -    // This supports env vars in older versions of Spark
    -    master = Option(master).getOrElse(System.getenv("MASTER"))
    -    deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
    +    // some configuration items can be derived if there are not present
    +    deriveConfigurations()
    +
    +    checkRequiredArguments()
    +  } catch {
    +    case e: SparkSubmit.ApplicationExitException =>
    +      // should only get here during test running
    +  }
    +
    +  private def deriveConfigurations() = {
    +
    +    // These config items point to file paths, but may need to be converted to absolute
file uris
    +    val configFileUris = List(SparkFiles, SparkSubmitPyFiles, SparkYarnDistArchives,
    +      SparkJars, SparkAppPrimaryResource)
    +
    +    // update configFileUris with resolvedURIs if present
    +    configFileUris
    +      .filter { key => conf.contains(key) &&
    +        ((key != SparkAppPrimaryResource) || (!SparkSubmit.isInternalOrShell(conf(key))))}
    +      .foreach { key =>
    +        conf.put(key, Utils.resolveURIs(conf(key), testWindows=false))
    +      }
     
         // Try to set main class from JAR if no --class argument is given
         if (mainClass == null && !isPython && primaryResource != null) {
           try {
             val jar = new JarFile(primaryResource)
             // Note that this might still return null if no main-class is set; we catch that
later
    -        mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
    +        val manifestMainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
    +        if (manifestMainClass != null && !manifestMainClass.isEmpty) {
    +          mainClass = manifestMainClass
    +        }
           } catch {
             case e: Exception =>
               SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
    -          return
           }
         }
    -
    -    // Global defaults. These should be keep to minimum to avoid confusing behavior.
    -    master = Option(master).getOrElse("local[*]")
    -
    -    // Set name from main class if not given
    -    name = Option(name).orElse(Option(mainClass)).orNull
    -    if (name == null && primaryResource != null) {
    +    if (master == "yarn-standalone") {
    +      SparkSubmit.printWarning("'yarn-standalone' is deprecated. Use 'yarn-cluster' instead.")
    +      master = "yarn-cluster"
    +    }
    +    if (name.isEmpty && primaryResource != null) {
           name = Utils.stripDirectory(primaryResource)
         }
       }
     
       /** Ensure that required fields exists. Call this only once all defaults are loaded.
*/
       private def checkRequiredArguments() = {
    -    if (args.length == 0) {
    -      printUsageAndExit(-1)
    +    conf.get(SparkPropertiesFile).foreach{propFilePath =>
    +      val propFile = new File(propFilePath)
    +      if (!propFile.exists()) {
    +        SparkSubmit.printErrorAndExit(s"--property-file $propFilePath does not exists")
    +      }
    +    }
    +
    +    if (!conf.isDefinedAt(SparkAppPrimaryResource)) {
    --- End diff --
    
    @vanzin - The intention was to use them as config items that are only documented for use
at the command line. This way I can cater for them with no further code or complexity.  If
you feel strongly about this I can break them out and treat them differently from all other
config items, with separate storage, and separate default value logic and separate getters
and setters.  Your call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message