Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F40621013B for ; Thu, 2 Jan 2014 05:31:06 +0000 (UTC) Received: (qmail 6557 invoked by uid 500); 2 Jan 2014 05:30:34 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 6121 invoked by uid 500); 2 Jan 2014 05:30:16 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 5615 invoked by uid 99); 2 Jan 2014 05:30:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 05:30:07 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Jan 2014 05:30:05 +0000 Received: (qmail 4402 invoked by uid 99); 2 Jan 2014 05:29:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 05:29:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B707B9117D9; Thu, 2 Jan 2014 05:29:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.incubator.apache.org Date: Thu, 02 Jan 2014 05:30:03 -0000 Message-Id: <78297187a74c45f585c119ee0158abc6@git.apache.org> In-Reply-To: <587fdf6495a04bb5935a81b6d03beaf8@git.apache.org> References: <587fdf6495a04bb5935a81b6d03beaf8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/33] git commit: Updated docs for SparkConf and handled review comments X-Virus-Checked: Checked by ClamAV on apache.org Updated docs for SparkConf and handled review comments Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0fa58097 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0fa58097 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0fa58097 Branch: refs/heads/master Commit: 0fa5809768cf60ec62b4277f04e23a44dc1582e2 Parents: 994f080 Author: Matei Zaharia Authored: Mon Dec 30 22:17:28 2013 -0500 Committer: Matei Zaharia Committed: Mon Dec 30 22:17:28 2013 -0500 ---------------------------------------------------------------------- .../scala/org/apache/spark/Partitioner.scala | 2 +- .../main/scala/org/apache/spark/SparkConf.scala | 31 +++++---- .../scala/org/apache/spark/SparkContext.scala | 18 +++-- .../main/scala/org/apache/spark/SparkEnv.scala | 13 ++-- .../spark/api/java/JavaSparkContext.scala | 6 ++ .../spark/deploy/FaultToleranceTest.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 10 +-- core/src/test/resources/spark.conf | 2 + docs/_config.yml | 2 +- docs/configuration.md | 71 +++++++++++++++----- docs/css/bootstrap.min.css | 2 +- docs/job-scheduling.md | 21 +++--- docs/monitoring.md | 3 +- docs/python-programming-guide.md | 15 +++-- docs/quick-start.md | 52 ++++++++++---- docs/running-on-mesos.md | 19 +++--- docs/scala-programming-guide.md | 4 +- docs/spark-standalone.md | 15 +++-- docs/streaming-programming-guide.md | 4 +- docs/tuning.md | 21 +++--- python/pyspark/conf.py | 24 +++++-- python/pyspark/context.py | 24 +++---- 23 files changed, 241 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 7cb545a..31b0773 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -52,7 +52,7 @@ object Partitioner { for (r <- bySize if r.partitioner != None) { return r.partitioner.get } - if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) { + if (rdd.context.conf.contains("spark.default.parallelism")) { return new HashPartitioner(rdd.context.defaultParallelism) } else { return new HashPartitioner(bySize.head.partitions.size) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ae52de4..96239cf 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and * get the same configuration no matter what is on the classpath. * + * All setter methods in this class support chaining. For example, you can write + * `new SparkConf().setMaster("local").setAppName("My app")`. + * + * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified + * by the user. Spark does not support modifying the configuration at runtime. + * * @param loadDefaults whether to load values from the system properties and classpath */ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { @@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ def setJars(jars: Array[String]): SparkConf = { - if (!jars.isEmpty) { - settings("spark.jars") = jars.mkString(",") - } - this + setJars(jars.toSeq) } /** @@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { * (Java-friendly version.) */ def setExecutorEnv(variables: Array[(String, String)]): SparkConf = { - for ((k, v) <- variables) { - setExecutorEnv(k, v) - } - this + setExecutorEnv(variables.toSeq) } /** - * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if - * you are not using `spark.executor.uri` to disseminate the Spark binary distribution. + * Set the location where Spark is installed on worker nodes. */ def setSparkHome(home: String): SparkConf = { if (home != null) { @@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." - getAll.filter(pair => pair._1.startsWith(prefix)) - .map(pair => (pair._1.substring(prefix.length), pair._2)) + getAll.filter{case (k, v) => k.startsWith(prefix)} + .map{case (k, v) => (k.substring(prefix.length), v)} } /** Does the configuration contain a given parameter? */ @@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { override def clone: SparkConf = { new SparkConf(false).setAll(settings) } + + /** + * Return a string listing all keys and values, one per line. This is useful to print the + * configuration out for debugging. + */ + def toDebugString: String = { + settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") + } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 810ed18..8134ce7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -55,14 +55,14 @@ import org.apache.spark.util._ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * @param conf_ a Spark Config object describing the application configuration. Any settings in + * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ class SparkContext( - conf_ : SparkConf, + config: SparkConf, // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc) // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains // a map from hostname to a list of input format splits on the host. @@ -107,7 +107,13 @@ class SparkContext( preferredNodeLocationData) } - val conf = conf_.clone() + private[spark] val conf = config.clone() + + /** + * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be + * changed at runtime. + */ + def getConf: SparkConf = conf.clone() if (!conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") @@ -135,11 +141,11 @@ class SparkContext( initLogging() // Create the Spark execution environment (cache, map output tracker, etc) - private[spark] val env = SparkEnv.createFromSystemProperties( + private[spark] val env = SparkEnv.create( + conf, "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, - conf, isDriver = true, isLocal = isLocal) SparkEnv.set(env) @@ -730,7 +736,7 @@ class SparkContext( * (in that order of preference). If neither of these is set, return None. */ private[spark] def getSparkHome(): Option[String] = { - if (conf.getOrElse("spark.home", null) != null) { + if (conf.contains("spark.home")) { Some(conf.get("spark.home")) } else if (System.getenv("SPARK_HOME") != null) { Some(System.getenv("SPARK_HOME")) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 34fad3e..d06af8e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker * objects needs to have the right SparkEnv set. You can get the current environment with * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. */ -class SparkEnv ( +class SparkEnv private[spark] ( val executorId: String, val actorSystem: ActorSystem, val serializerManager: SerializerManager, @@ -63,7 +63,7 @@ class SparkEnv ( // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - def stop() { + private[spark] def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() mapOutputTracker.stop() @@ -79,6 +79,7 @@ class SparkEnv ( //actorSystem.awaitTermination() } + private[spark] def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) @@ -111,11 +112,11 @@ object SparkEnv extends Logging { env.get() } - def createFromSystemProperties( + private[spark] def create( + conf: SparkConf, executorId: String, hostname: String, port: Int, - conf: SparkConf, isDriver: Boolean, isLocal: Boolean): SparkEnv = { @@ -129,7 +130,7 @@ object SparkEnv extends Logging { } // set only if unset until now. - if (conf.getOrElse("spark.hostPort", null) == null) { + if (!conf.contains("spark.hostPort")) { if (!isDriver){ // unexpected Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set") @@ -216,7 +217,7 @@ object SparkEnv extends Logging { } // Warn about deprecated spark.cache.class property - if (conf.getOrElse("spark.cache.class", null) != null) { + if (conf.contains("spark.cache.class")) { logWarning("The spark.cache.class property is no longer being used! Specify storage " + "levels using the RDD.persist() method instead.") } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e03cf9d..d6aeed7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] new JavaRDD(sc.checkpointFile(path)) } + + /** + * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be + * changed at runtime. + */ + def getConf: SparkConf = sc.getConf } object JavaSparkContext { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 0aa8852..4dfb19e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging { /** Creates a SparkContext, which constructs a Client to interact with our cluster. */ def createClient() = { if (sc != null) { sc.stop() } - // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this + // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. System.setProperty("spark.driver.port", "0") sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome) @@ -417,4 +417,4 @@ private[spark] object Docker extends Logging { "docker ps -l -q".!(ProcessLogger(line => id = line)) new DockerId(id) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a6eabc4..2400154 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -109,7 +109,7 @@ private[spark] class Executor( // Initialize Spark environment (using system properties read above) private val env = { if (!isLocal) { - val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf, + val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, isDriver = false, isLocal = false) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b6b89cc..ca3320b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -397,12 +397,11 @@ private[spark] object Utils extends Logging { } def localHostPort(conf: SparkConf): String = { - val retval = conf.getOrElse("spark.hostPort", null) + val retval = conf.getOrElse("spark.hostPort", null) if (retval == null) { logErrorWithStack("spark.hostPort not set but invoking localHostPort") return localHostName() } - retval } @@ -414,9 +413,12 @@ private[spark] object Utils extends Logging { assert(hostPort.indexOf(':') != -1, message) } - // Used by DEBUG code : remove when all testing done def logErrorWithStack(msg: String) { - try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } + try { + throw new Exception + } catch { + case ex: Exception => logError(msg, ex) + } } // Typically, this will be of order of number of nodes in cluster http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/test/resources/spark.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf index 6c99bdc..aa4e751 100644 --- a/core/src/test/resources/spark.conf +++ b/core/src/test/resources/spark.conf @@ -1,3 +1,5 @@ +# A simple spark.conf file used only in our unit tests + spark.test.intTestProperty = 1 spark.test { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/_config.yml ---------------------------------------------------------------------- diff --git a/docs/_config.yml b/docs/_config.yml index 02067f9..11d18f0 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -4,7 +4,7 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. SPARK_VERSION: 0.9.0-incubating-SNAPSHOT -SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT +SPARK_VERSION_SHORT: 0.9.0 SCALA_VERSION: 2.10 MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 677d182..567aba0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3,26 +3,37 @@ layout: global title: Spark Configuration --- -Spark provides three main locations to configure the system: +Spark provides three locations to configure the system: -* [Java system properties](#system-properties), which control internal configuration parameters and can be set - either programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through - JVM arguments. -* [Environment variables](#environment-variables) for configuring per-machine settings such as the IP address, - which can be set in the `conf/spark-env.sh` script. -* [Logging configuration](#configuring-logging), which is done through `log4j.properties`. +* [Spark properties](#spark-properties) control most application parameters and can be set by passing + a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java + system properties. +* [Environment variables](#environment-variables) can be used to set per-machine settings, such as + the IP address, through the `conf/spark-env.sh` script on each node. +* [Logging](#configuring-logging) can be configured through `log4j.properties`. -# System Properties +# Spark Properties -To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows: +Spark properties control most application settings and are configured separately for each application. +The preferred way to set them is by passing a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) +class to your SparkContext constructor. +Alternatively, Spark will also load them from Java system properties (for compatibility with old versions +of Spark) and from a [`spark.conf` file](#configuration-files) on your classpath. + +SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and +application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could +initialize an application as follows: {% highlight scala %} -System.setProperty("spark.cores.max", "5") -val sc = new SparkContext(...) +val conf = new SparkConf() + .setMaster("local") + .setAppName("My application") + .set("spark.executor.memory", "1g") +val sc = new SparkContext(conf) {% endhighlight %} -Most of the configurable system properties control internal settings that have reasonable default values. However, +Most of the properties control internal settings that have reasonable default values. However, there are at least five properties that you will commonly want to control: @@ -385,11 +396,40 @@ Apart from these, the following properties are also available, and may be useful
+## Viewing Spark Properties + +The application web UI at `http://:4040` lists Spark properties in the "Environment" tab. +This is a useful place to check to make sure that your properties have been set correctly. + +## Configuration Files + +You can also configure Spark properties through a `spark.conf` file on your Java classpath. +Because these properties are usually application-specific, we recommend putting this fine *only* on your +application's classpath, and not in a global Spark classpath. + +The `spark.conf` file uses Typesafe Config's [HOCON format](https://github.com/typesafehub/config#json-superset), +which is a superset of Java properties files and JSON. For example, the following is a simple config file: + +{% highlight awk %} +# Comments are allowed +spark.executor.memory = 512m +spark.serializer = org.apache.spark.serializer.KryoSerializer +{% endhighlight %} + +The format also allows hierarchical nesting, as follows: + +{% highlight awk %} +spark.akka { + threads = 8 + timeout = 200 +} +{% endhighlight %} + # Environment Variables -Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh` +Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh` script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such -as library search paths. While Java system properties can also be set here, for application settings, we recommend setting +as library search paths. While Spark properties can also be set there through `SPARK_JAVA_OPTS`, for per-application settings, we recommend setting these properties within the application instead of in `spark-env.sh` so that different applications can use different settings. @@ -406,7 +446,8 @@ The following variables can be set in `spark-env.sh`: Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend doing that when possible. * `SPARK_JAVA_OPTS`, to add JVM options. This includes Java options like garbage collector settings and any system - properties that you'd like to pass with `-D` (e.g., `-Dspark.local.dir=/disk1,/disk2`). + properties that you'd like to pass with `-D`. One use case is to set some Spark properties differently on this + machine, e.g., `-Dspark.local.dir=/disk1,/disk2`. * Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores to use on each machine and maximum memory.