spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [30/33] git commit: Miscellaneous fixes from code review.
Date Thu, 02 Jan 2014 05:30:08 GMT
Miscellaneous fixes from code review.

Also replaced SparkConf.getOrElse with just a "get" that takes a default
value, and added getInt, getLong, etc to make code that uses this
simpler later on.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e2c68642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e2c68642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e2c68642

Branch: refs/heads/master
Commit: e2c68642c64345434e2034082cf9b299491e9e9f
Parents: 45ff8f4
Author: Matei Zaharia <matei@databricks.com>
Authored: Wed Jan 1 22:03:39 2014 -0500
Committer: Matei Zaharia <matei@databricks.com>
Committed: Wed Jan 1 22:03:39 2014 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 60 ++++++++++------
 .../scala/org/apache/spark/SparkContext.scala   | 73 ++++++++++++--------
 .../main/scala/org/apache/spark/SparkEnv.scala  | 10 +--
 .../org/apache/spark/api/python/PythonRDD.scala |  4 +-
 .../org/apache/spark/broadcast/Broadcast.scala  |  4 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  4 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  2 +-
 .../org/apache/spark/deploy/master/Master.scala | 12 ++--
 .../spark/deploy/master/MasterArguments.scala   |  6 +-
 .../deploy/master/SparkZooKeeperSession.scala   |  2 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   |  2 +-
 .../master/ZooKeeperPersistenceEngine.scala     |  2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  2 +-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  4 +-
 .../apache/spark/metrics/MetricsSystem.scala    |  2 +-
 .../spark/network/ConnectionManager.scala       | 18 ++---
 .../spark/network/netty/ShuffleCopier.scala     |  2 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  4 +-
 .../spark/scheduler/TaskResultGetter.scala      |  2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 10 +--
 .../apache/spark/scheduler/TaskSetManager.scala | 16 ++---
 .../cluster/CoarseGrainedSchedulerBackend.scala |  9 +--
 .../cluster/SimrSchedulerBackend.scala          |  2 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |  2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  4 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  2 +-
 .../spark/serializer/KryoSerializer.scala       | 13 ++--
 .../spark/storage/BlockFetcherIterator.scala    |  2 +-
 .../org/apache/spark/storage/BlockManager.scala | 24 +++----
 .../spark/storage/BlockManagerMaster.scala      |  4 +-
 .../spark/storage/BlockManagerMasterActor.scala |  4 +-
 .../apache/spark/storage/DiskBlockManager.scala |  2 +-
 .../spark/storage/ShuffleBlockManager.scala     |  6 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |  4 +-
 .../org/apache/spark/ui/env/EnvironmentUI.scala |  2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 18 ++---
 .../org/apache/spark/util/MetadataCleaner.scala |  4 +-
 .../scala/org/apache/spark/util/Utils.scala     |  4 +-
 .../scala/org/apache/spark/SparkConfSuite.scala |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 12 +---
 python/pyspark/conf.py                          | 12 ++--
 .../org/apache/spark/repl/SparkIMain.scala      |  2 +-
 .../streaming/dstream/NetworkInputDStream.scala |  6 +-
 .../streaming/scheduler/JobGenerator.scala      |  4 +-
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 49 files changed, 206 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 96239cf..98343e9 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -42,6 +42,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
 
   /** Set a configuration variable. */
   def set(key: String, value: String): SparkConf = {
+    if (key == null) {
+      throw new NullPointerException("null key")
+    }
+    if (value == null) {
+      throw new NullPointerException("null value")
+    }
     settings(key) = value
     this
   }
@@ -51,26 +57,17 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
    * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
    */
   def setMaster(master: String): SparkConf = {
-    if (master != null) {
-      settings("spark.master") = master
-    }
-    this
+    set("spark.master", master)
   }
 
   /** Set a name for your application. Shown in the Spark web UI. */
   def setAppName(name: String): SparkConf = {
-    if (name != null) {
-      settings("spark.app.name") = name
-    }
-    this
+    set("spark.app.name", name)
   }
 
   /** Set JAR files to distribute to the cluster. */
   def setJars(jars: Seq[String]): SparkConf = {
-    if (!jars.isEmpty) {
-      settings("spark.jars") = jars.mkString(",")
-    }
-    this
+    set("spark.jars", jars.mkString(","))
   }
 
   /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -84,8 +81,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
    * (for example spark.executorEnv.PATH) but this method makes them easier to set.
    */
   def setExecutorEnv(variable: String, value: String): SparkConf = {
-    settings("spark.executorEnv." + variable) = value
-    this
+    set("spark.executorEnv." + variable, value)
   }
 
   /**
@@ -112,10 +108,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
    * Set the location where Spark is installed on worker nodes.
    */
   def setSparkHome(home: String): SparkConf = {
-    if (home != null) {
-      settings("spark.home") = home
-    }
-    this
+    set("spark.home", home)
   }
 
   /** Set multiple parameters together */
@@ -132,9 +125,20 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
     this
   }
 
-  /** Get a parameter; throws an exception if it's not set */
+  /** Remove a parameter from the configuration */
+  def remove(key: String): SparkConf = {
+    settings.remove(key)
+    this
+  }
+
+  /** Get a parameter; throws a NoSuchElementException if it's not set */
   def get(key: String): String = {
-    settings(key)
+    settings.getOrElse(key, throw new NoSuchElementException(key))
+  }
+
+  /** Get a parameter, falling back to a default if not set */
+  def get(key: String, defaultValue: String): String = {
+    settings.getOrElse(key, defaultValue)
   }
 
   /** Get a parameter as an Option */
@@ -145,9 +149,19 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
   /** Get all parameters as a list of pairs */
   def getAll: Array[(String, String)] = settings.clone().toArray
 
-  /** Get a parameter, falling back to a default if not set */
-  def getOrElse(k: String, defaultValue: String): String = {
-    settings.getOrElse(k, defaultValue)
+  /** Get a parameter as an integer, falling back to a default if not set */
+  def getInt(key: String, defaultValue: Int): Int = {
+    getOption(key).map(_.toInt).getOrElse(defaultValue)
+  }
+
+  /** Get a parameter as a long, falling back to a default if not set */
+  def getLong(key: String, defaultValue: Long): Long = {
+    getOption(key).map(_.toLong).getOrElse(defaultValue)
+  }
+
+  /** Get a parameter as a double, falling back to a default if not set */
+  def getDouble(key: String, defaultValue: Double): Double = {
+    getOption(key).map(_.toDouble).getOrElse(defaultValue)
   }
 
   /** Get all executor environment variables set on this SparkConf */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 46874c4..84bd0f7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,12 +22,11 @@ import java.net.URI
 import java.util.{UUID, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.{Map, Set, immutable}
+import scala.collection.{Map, Set}
 import scala.collection.generic.Growable
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.reflect.{ClassTag, classTag}
-import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -49,7 +48,8 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util._
+import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
+ClosureCleaner}
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -77,7 +77,7 @@ class SparkContext(
    * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
    */
   def this(master: String, appName: String, conf: SparkConf) =
-    this(conf.clone().setMaster(master).setAppName(appName))
+    this(SparkContext.updatedConf(conf, master, appName))
 
   /**
    * Alternative constructor that allows setting common Spark properties directly
@@ -97,13 +97,7 @@ class SparkContext(
       environment: Map[String, String] = Map(),
       preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
   {
-    this(
-      new SparkConf()
-        .setMaster(master)
-        .setAppName(appName)
-        .setJars(jars)
-        .setExecutorEnv(environment.toSeq)
-        .setSparkHome(sparkHome),
+    this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
       preferredNodeLocationData)
   }
 
@@ -175,11 +169,9 @@ class SparkContext(
   // Environment variables to pass to our executors
   private[spark] val executorEnvs = HashMap[String, String]()
   // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
-  for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
-    val value = System.getenv(key)
-    if (value != null) {
-      executorEnvs(key) = value
-    }
+  for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING");
+      value <- Option(System.getenv(key))) {
+    executorEnvs(key) = value
   }
   // Since memory can be set with a system property too, use that
   executorEnvs("SPARK_MEM") = executorMemory + "m"
@@ -220,7 +212,7 @@ class SparkContext(
         hadoopConf.set(key.substring("spark.hadoop.".length), value)
       }
     }
-    val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
+    val bufferSize = conf.get("spark.buffer.size", "65536")
     hadoopConf.set("io.file.buffer.size", bufferSize)
     hadoopConf
   }
@@ -733,13 +725,7 @@ class SparkContext(
    * (in that order of preference). If neither of these is set, return None.
    */
   private[spark] def getSparkHome(): Option[String] = {
-    if (conf.contains("spark.home")) {
-      Some(conf.get("spark.home"))
-    } else if (System.getenv("SPARK_HOME") != null) {
-      Some(System.getenv("SPARK_HOME"))
-    } else {
-      None
-    }
+    conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME")))
   }
 
   /**
@@ -1026,7 +1012,7 @@ object SparkContext {
 
   /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
-   * their JARs to SparkContext
+   * their JARs to SparkContext.
    */
   def jarOfClass(cls: Class[_]): Seq[String] = {
     val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
@@ -1043,10 +1029,41 @@ object SparkContext {
     }
   }
 
-  /** Find the JAR that contains the class of a particular object */
+  /**
+   * Find the JAR that contains the class of a particular object, to make it easy for users
+   * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+   * your driver program.
+   */
   def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
 
-  // Creates a task scheduler based on a given master URL. Extracted for testing.
+  /**
+   * Creates a modified version of a SparkConf with the parameters that can be passed separately
+   * to SparkContext, to make it easier to write SparkContext's constructors. This ignores
+   * parameters that are passed as the default value of null, instead of throwing an exception
+   * like SparkConf would.
+   */
+  private def updatedConf(
+      conf: SparkConf,
+      master: String,
+      appName: String,
+      sparkHome: String = null,
+      jars: Seq[String] = Nil,
+      environment: Map[String, String] = Map()): SparkConf =
+  {
+    val res = conf.clone()
+    res.setMaster(master)
+    res.setAppName(appName)
+    if (sparkHome != null) {
+      res.setSparkHome(sparkHome)
+    }
+    if (!jars.isEmpty) {
+      res.setJars(jars)
+    }
+    res.setExecutorEnv(environment.toSeq)
+    res
+  }
+
+  /** Creates a task scheduler based on a given master URL. Extracted for testing. */
   private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
       : TaskScheduler =
   {
@@ -1156,7 +1173,7 @@ object SparkContext {
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
         val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.getOrElse("spark.mesos.coarse", "false").toBoolean
+        val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
           new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d06af8e..634a94f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -144,17 +144,17 @@ object SparkEnv extends Logging {
     // Create an instance of the class named by the given Java system property, or by
     // defaultClassName if the property is not set, and return it as a T
     def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
-      val name = conf.getOrElse(propertyName,  defaultClassName)
+      val name = conf.get(propertyName,  defaultClassName)
       Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
     }
 
     val serializerManager = new SerializerManager
 
     val serializer = serializerManager.setDefault(
-      conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
+      conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
 
     val closureSerializer = serializerManager.get(
-      conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+      conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
       conf)
 
     def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
@@ -162,8 +162,8 @@ object SparkEnv extends Logging {
         logInfo("Registering " + name)
         Left(actorSystem.actorOf(Props(newActor), name = name))
       } else {
-        val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
-        val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
+        val driverHost: String = conf.get("spark.driver.host", "localhost")
+        val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
         val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
         logInfo("Connecting to " + name + ": " + url)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 05fd824..32cc70e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
     accumulator: Accumulator[JList[Array[Byte]]])
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+  val bufferSize = conf.get("spark.buffer.size", "65536").toInt
 
   override def getPartitions = parent.partitions
 
@@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
 
   Utils.checkHost(serverHost, "Expected hostname")
 
-  val bufferSize = SparkEnv.get.conf.getOrElse("spark.buffer.size", "65536").toInt
+  val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt
 
   override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index be99d22..0fc478a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -31,7 +31,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
   override def toString = "Broadcast(" + id + ")"
 }
 
-private[spark] 
+private[spark]
 class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
 
   private var initialized = false
@@ -43,7 +43,7 @@ class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging
   private def initialize() {
     synchronized {
       if (!initialized) {
-        val broadcastFactoryClass = conf.getOrElse(
+        val broadcastFactoryClass = conf.get(
           "spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
 
         broadcastFactory =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47528bc..db596d5 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging {
   def initialize(isDriver: Boolean, conf: SparkConf) {
     synchronized {
       if (!initialized) {
-        bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
-        compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+        bufferSize = conf.get("spark.buffer.size", "65536").toInt
+        compress = conf.get("spark.broadcast.compress", "true").toBoolean
         if (isDriver) {
           createServer(conf)
           conf.set("spark.httpBroadcast.uri",  serverUri)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 00ec3b9..9530938 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -180,7 +180,7 @@ extends Logging {
     initialized = false
   }
 
-  lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
+  lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024
 
   def blockifyObject[T](obj: T): TorrentInfo = {
     val byteArray = Utils.serialize[T](obj)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9c89e36..7b696cf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -43,11 +43,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val conf = new SparkConf
 
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
-  val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
-  val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
-  val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
-  val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
+  val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
+  val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
+  val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+  val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+  val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
 
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
@@ -88,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
+  val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
 
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 7ce83f9..e7f3224 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -27,8 +27,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
   var host = Utils.localHostName()
   var port = 7077
   var webUiPort = 8080
-  
-  // Check for settings in environment variables 
+
+  // Check for settings in environment variables
   if (System.getenv("SPARK_MASTER_HOST") != null) {
     host = System.getenv("SPARK_MASTER_HOST")
   }
@@ -38,7 +38,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
   if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
   }
-  if (conf.get("master.ui.port") != null) {
+  if (conf.contains("master.ui.port")) {
     webUiPort = conf.get("master.ui.port").toInt
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 60c7a7c..999090a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, Logging}
  */
 private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
     conf: SparkConf) extends Logging {
-  val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
+  val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
 
   val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
   val ZK_TIMEOUT_MILLIS = 30000

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index a61597b..77c23fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -28,7 +28,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
     masterUrl: String, conf: SparkConf)
   extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging  {
 
-  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
 
   private val watcher = new ZooKeeperWatcher()
   private val zk = new SparkZooKeeperSession(this, conf)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 245a558..52000d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -27,7 +27,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   with SparkZooKeeperWatcher
   with Logging
 {
-  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
 
   val zk = new SparkZooKeeperSession(this, conf)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f844fcb..fcaf4e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[spark] class Worker(
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
+  val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index a801d85..c382034 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -37,7 +37,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
   val timeout = AkkaUtils.askTimeout(worker.conf)
   val host = Utils.localHostName()
   val port = requestedPort.getOrElse(
-    worker.conf.getOrElse("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
+    worker.conf.get("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
 
   var server: Option[Server] = None
   var boundPort: Option[Int] = None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5b70165..3c92c20 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -302,7 +302,7 @@ private[spark] class Executor(
    * new classes defined by the REPL as the user types code
    */
   private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
-    val classUri = conf.getOrElse("spark.repl.class.uri", null)
+    val classUri = conf.get("spark.repl.class.uri", null)
     if (classUri != null) {
       logInfo("Using REPL class URI: " + classUri)
       try {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 075a18b..a1e9884 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -39,7 +39,7 @@ trait CompressionCodec {
 
 private[spark] object CompressionCodec {
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.getOrElse(
+    createCodec(conf, conf.get(
       "spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
   }
 
@@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
+    val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt
     new SnappyOutputStream(s, blockSize)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 0e41c73..9930537 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -65,7 +65,7 @@ import org.apache.spark.metrics.source.Source
 private[spark] class MetricsSystem private (val instance: String,
     conf: SparkConf) extends Logging {
 
-  val confFile = conf.getOrElse("spark.metrics.conf", null)
+  val confFile = conf.get("spark.metrics.conf", null)
   val metricsConfig = new MetricsConfig(Option(confFile))
 
   val sinks = new mutable.ArrayBuffer[Sink]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 697096f..46c40d0 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   private val selector = SelectorProvider.provider.openSelector()
 
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt,
-    conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt,
-    conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.get("spark.core.connection.handler.threads.min", "20").toInt,
+    conf.get("spark.core.connection.handler.threads.max", "60").toInt,
+    conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt,
-    conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt,
-    conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.get("spark.core.connection.io.threads.min", "4").toInt,
+    conf.get("spark.core.connection.io.threads.max", "32").toInt,
+    conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt,
-    conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt,
-    conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.get("spark.core.connection.connect.threads.min", "1").toInt,
+    conf.get("spark.core.connection.connect.threads.max", "8").toInt,
+    conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index db28ddf..b729eb1 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
       resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
+    val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt
     val fc = new FileClient(handler, connectTimeout)
 
     try {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 172ba6b..6d4f461 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
       throw new IOException("Checkpoint failed: temporary path " +
         tempOutputPath + " already exists")
     }
-    val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
 
     val fileOutputStream = if (blockSize < 0) {
       fs.create(tempOutputPath, false, bufferSize)
@@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging {
     ): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(broadcastedConf.value.value)
-    val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
     val deserializeStream = serializer.deserializeStream(fileInputStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 29b0247..e22b1e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
 private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends Logging {
 
-  private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
+  private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
     THREADS, "Result resolver thread")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bffd990..d94b706 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl(
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
-  def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
+  def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt)
 
   val conf = sc.conf
 
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
+  val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
+  val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong
 
   // TaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
@@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
   var rootPool: Pool = null
   // default scheduler is FIFO
   val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    conf.getOrElse("spark.scheduler.mode", "FIFO"))
+    conf.get("spark.scheduler.mode", "FIFO"))
 
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl(
   override def start() {
     backend.start()
 
-    if (!isLocal && conf.getOrElse("spark.speculation", "false").toBoolean) {
+    if (!isLocal && conf.get("spark.speculation", "false").toBoolean) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b99664a..67ad99a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -57,11 +57,11 @@ private[spark] class TaskSetManager(
   val conf = sched.sc.conf
 
   // CPUs to request per task
-  val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
+  val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble
+  val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -116,7 +116,7 @@ private[spark] class TaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
+    conf.get("spark.logging.exceptionPrintInterval", "10000").toLong
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
@@ -678,14 +678,14 @@ private[spark] class TaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
+    val defaultWait = conf.get("spark.locality.wait", "3000")
     level match {
       case TaskLocality.PROCESS_LOCAL =>
-        conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
+        conf.get("spark.locality.wait.process", defaultWait).toLong
       case TaskLocality.NODE_LOCAL =>
-        conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
+        conf.get("spark.locality.wait.node", defaultWait).toLong
       case TaskLocality.RACK_LOCAL =>
-        conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
+        conf.get("spark.locality.wait.rack", defaultWait).toLong
       case TaskLocality.ANY =>
         0L
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b4a3ecc..2f5bcaf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Await
 import scala.concurrent.duration._
-import scala.util.Try
 
 import akka.actor._
 import akka.pattern.ask
@@ -64,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
       // Periodically revive offers to allow delay scheduling to work
-      val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
+      val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong
       import context.dispatcher
       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
     }
@@ -209,8 +208,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     driverActor ! KillTask(taskId, executorId)
   }
 
-  override def defaultParallelism() = Try(conf.get("spark.default.parallelism")).toOption
-      .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+  override def defaultParallelism(): Int = {
+    conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse(
+      math.max(totalCoreCount.get(), 2))
+  }
 
   // Called by subclasses when notified of a lost worker
   def removeExecutor(executorId: String, reason: String) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index f41fbbd..b44d1e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend(
   val tmpPath = new Path(driverFilePath + "_tmp")
   val filePath = new Path(driverFilePath)
 
-  val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
+  val maxCores = conf.get("spark.simr.executor.cores", "1").toInt
 
   override def start() {
     super.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 2240775..9858717 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -38,7 +38,7 @@ private[spark] class SparkDeploySchedulerBackend(
   var stopping = false
   var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
 
-  val maxCores = conf.getOrElse("spark.cores.max",  Int.MaxValue.toString).toInt
+  val maxCores = conf.get("spark.cores.max",  Int.MaxValue.toString).toInt
 
   override def start() {
     super.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 9e2cd3f..d247fa4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -62,7 +62,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   var driver: SchedulerDriver = null
 
   // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = conf.getOrElse("spark.cores.max",  Int.MaxValue.toString).toInt
+  val maxCores = conf.get("spark.cores.max",  Int.MaxValue.toString).toInt
 
   // Cores we have acquired with each Mesos task ID
   val coresByTaskId = new HashMap[Int, Int]
@@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     "Spark home is not set; set it through the spark.home system " +
     "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 
-  val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
+  val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt
 
   var nextMesosTaskId = 0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index be96382..c20fc41 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend(
   }
 
   // TODO: query Mesos for number of cores
-  override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
+  override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2367f3f..a24a3b0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -29,17 +29,14 @@ import org.apache.spark._
 import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage._
-import scala.util.Try
-import org.apache.spark.storage.PutBlock
-import org.apache.spark.storage.GetBlock
-import org.apache.spark.storage.GotBlock
+import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
  */
 class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
   private val bufferSize = {
-    conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+    conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
   }
 
   def newKryoOutput() = new KryoOutput(bufferSize)
@@ -51,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
     // Do this before we invoke the user registrator so the user registrator can override this.
-    kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
+    kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean)
 
     for (cls <- KryoSerializer.toRegister) kryo.register(cls)
 
@@ -61,13 +58,13 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
 
     // Allow the user to register their own classes by setting spark.kryo.registrator
     try {
-      Try(conf.get("spark.kryo.registrator")).toOption.foreach { regCls =>
+      for (regCls <- conf.getOption("spark.kryo.registrator")) {
         logDebug("Running user registrator: " + regCls)
         val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
         reg.registerClasses(kryo)
       }
     } catch {
-      case _: Exception => println("Failed to register spark.kryo.registrator")
+      case e: Exception => logError("Failed to run spark.kryo.registrator", e)
     }
 
     // Register Chill's classes; we do this after our ranges and the user's own classes to let

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 3b25f68..4747863 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
         fetchRequestsSync.put(request)
       }
 
-      copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
+      copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt)
       logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
         Utils.getUsedTimeMs(startTime))
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16ee208..6d2cda9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -49,7 +49,7 @@ private[spark] class BlockManager(
 
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
-    conf.getOrElse("spark.local.dir",  System.getProperty("java.io.tmpdir")))
+    conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
 
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
   private val nettyPort: Int = {
-    val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
-    val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
+    val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
+    val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
@@ -72,14 +72,14 @@ private[spark] class BlockManager(
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
   val maxBytesInFlight =
-    conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+    conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
 
   // Whether to compress broadcast variables that are stored
-  val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
+  val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
   // Whether to compress shuffle output that are stored
-  val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
+  val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
   // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
+  val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
 
   val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
 
@@ -443,7 +443,7 @@ private[spark] class BlockManager(
       : BlockFetcherIterator = {
 
     val iter =
-      if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
+      if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
       } else {
         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
+    val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
     new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
   }
 
@@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging {
   val ID_GENERATOR = new IdGenerator
 
   def getMaxMemory(conf: SparkConf): Long = {
-    val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
+    val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 
   def getHeartBeatFrequency(conf: SparkConf): Long =
-    conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+    conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
 
   def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
-    conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+    conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
 
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 8e4a88b..b5afe8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -31,8 +31,8 @@ private[spark]
 class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
     conf: SparkConf) extends Logging {
 
-  val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
-  val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
+  val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
+  val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index dbbeeb3..58452d9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -50,10 +50,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
 
   private val akkaTimeout = AkkaUtils.askTimeout(conf)
 
-  val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
+  val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs",
     "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
 
-  val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",
+  val checkTimeoutInterval = conf.get("spark.storage.blockManagerTimeoutIntervalMs",
     "60000").toLong
 
   var timeoutCheckingTask: Cancellable = null

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7697092..55dcb37 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-  private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
+  private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt
 
   // Create one local directory for each path mentioned in spark.local.dir; then, inside this
   // directory, create multiple subdirectories that we will hash files into, in order to avoid

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 151eedb..39dc7bb 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -27,8 +27,6 @@ import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
 import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
-import scala.util.Try
-import org.apache.spark.SparkConf
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -66,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
+    conf.get("spark.shuffle.consolidateFiles", "false").toBoolean
 
-  private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+  private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024
 
   /**
    * Contains all the state related to a particular shuffle. This includes a pool of unused

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 0ce8d9c..50dfdbd 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
 /** Top level user interface for Spark */
 private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
-  val port = sc.conf.getOrElse("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
+  val port = sc.conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
   var boundPort: Option[Int] = None
   var server: Option[Server] = None
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 6b4602f..88f41be 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -66,7 +66,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
       UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
 
     val classPathEntries = classPathProperty._2
-        .split(sc.conf.getOrElse("path.separator", ":"))
+        .split(sc.conf.get("path.separator", ":"))
         .filterNot(e => e.isEmpty)
         .map(e => (e, "System Classpath"))
     val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 315014d..b7b8725 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
  */
 private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
   // How many stages to remember
-  val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
+  val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt
   val DEFAULT_POOL_NAME = "default"
 
   val stageIdToPool = new HashMap[Int, String]()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 58b26f7..362cea5 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -41,19 +41,19 @@ private[spark] object AkkaUtils {
   def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
     conf: SparkConf): (ActorSystem, Int) = {
 
-    val akkaThreads   = conf.getOrElse("spark.akka.threads", "4").toInt
-    val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
+    val akkaThreads   = conf.get("spark.akka.threads", "4").toInt
+    val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt
 
-    val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
+    val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
 
-    val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
+    val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
     val lifecycleEvents =
-      if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+      if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
 
-    val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
+    val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
     val akkaFailureDetector =
-      conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
-    val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
+      conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
+    val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
 
     val akkaConf = ConfigFactory.parseString(
       s"""
@@ -89,6 +89,6 @@ private[spark] object AkkaUtils {
 
   /** Returns the default Spark timeout to use for Akka ask operations. */
   def askTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
+    Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 9ea7fc2..aa7f52c 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -74,12 +74,12 @@ object MetadataCleanerType extends Enumeration {
 // initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
 object MetadataCleaner {
   def getDelaySeconds(conf: SparkConf) = {
-    conf.getOrElse("spark.cleaner.ttl", "3500").toInt
+    conf.get("spark.cleaner.ttl", "3500").toInt
   }
 
   def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
   {
-    conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
+    conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
       .toInt
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ca3320b..5f12531 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -311,7 +311,7 @@ private[spark] object Utils extends Logging {
    * multiple paths.
    */
   def getLocalDir(conf: SparkConf): String = {
-    conf.getOrElse("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)
+    conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)
   }
 
   /**
@@ -397,7 +397,7 @@ private[spark] object Utils extends Logging {
   }
 
   def localHostPort(conf: SparkConf): String = {
-    val retval = conf.getOrElse("spark.hostPort", null)
+    val retval = conf.get("spark.hostPort", null)
     if (retval == null) {
       logErrorWithStack("spark.hostPort not set but invoking localHostPort")
       return localHostName()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 77c7b82..ef5936d 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -74,7 +74,7 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
     assert(!conf.contains("k4"), "conf contained k4")
     assert(conf.get("k1") === "v4")
     intercept[Exception] { conf.get("k4") }
-    assert(conf.getOrElse("k4", "not found") === "not found")
+    assert(conf.get("k4", "not found") === "not found")
     assert(conf.getOption("k1") === Some("v4"))
     assert(conf.getOption("k4") === None)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 5d33e66..1eec672 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   private val conf = new SparkConf
 
-  val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
+  val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong
   val MAX_TASK_FAILURES = 4
 
   test("TaskSet with no preferences") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index f940448..af4b31d 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -27,8 +27,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 import scala.util.Try
 import akka.actor.{Props, ActorSelection, ActorSystem}
 
-class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
-  private val testConf = new SparkConf
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+  private val testConf = new SparkConf(false)
   val rootDir0 = Files.createTempDir()
   rootDir0.deleteOnExit()
   val rootDir1 = Files.createTempDir()
@@ -38,9 +38,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
 
   // This suite focuses primarily on consolidation features,
   // so we coerce consolidation if not already enabled.
-  val consolidateProp = "spark.shuffle.consolidateFiles"
-  val oldConsolidate = Try(testConf.get(consolidateProp)).toOption
-  testConf.set(consolidateProp, "true")
+  testConf.set("spark.shuffle.consolidateFiles", "true")
 
   val shuffleBlockManager = new ShuffleBlockManager(null) {
     override def conf = testConf.clone
@@ -50,10 +48,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
 
   var diskBlockManager: DiskBlockManager = _
 
-  override def afterAll() {
-    oldConsolidate.map(c => System.setProperty(consolidateProp, c))
-  }
-
   override def beforeEach() {
     diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
     shuffleBlockManager.idToSegmentMap.clear()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/python/pyspark/conf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index 9dcdcfa..c111e2e 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -93,7 +93,7 @@ class SparkConf(object):
 
     def set(self, key, value):
         """Set a configuration property."""
-        self._jconf.set(key, value)
+        self._jconf.set(key, unicode(value))
         return self
 
     def setMaster(self, value):
@@ -132,13 +132,9 @@ class SparkConf(object):
             self._jconf.set(k, v)
         return self
 
-    def get(self, key):
-        """Get the configured value for some key, if set."""
-        return self._jconf.get(key)
-
-    def getOrElse(self, key, defaultValue):
-        """Get the value for some key, or return a default otherwise."""
-        return self._jconf.getOrElse(key, defaultValue)
+    def get(self, key, defaultValue=None):
+        """Get the configured value for some key, or return a default otherwise."""
+        return self._jconf.get(key, defaultValue)
 
     def getAll(self):
         """Get all values as a list of key-value pairs."""

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index a993083..59fdb0b 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -89,7 +89,7 @@ import org.apache.spark.util.Utils
       /** Local directory to save .class files too */
       val outputDir = {
         val tmp = System.getProperty("java.io.tmpdir")
-        val rootDir = new SparkConf().getOrElse("spark.repl.classdir",  tmp)
+        val rootDir = new SparkConf().get("spark.repl.classdir",  tmp)
         Utils.createTempDir(rootDir)
       }
       if (SPARK_DEBUG_REPL) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a230845..27d474c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -174,8 +174,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   /** A helper actor that communicates with the NetworkInputTracker */
   private class NetworkReceiverActor extends Actor {
     logInfo("Attempting to register with tracker")
-    val ip = env.conf.getOrElse("spark.driver.host", "localhost")
-    val port = env.conf.getOrElse("spark.driver.port", "7077").toInt
+    val ip = env.conf.get("spark.driver.host", "localhost")
+    val port = env.conf.get("spark.driver.port", "7077").toInt
     val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
     val tracker = env.actorSystem.actorSelection(url)
     val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
-    val blockInterval = env.conf.getOrElse("spark.streaming.blockInterval", "200").toLong
+    val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
     val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
     val blockStorageLevel = storageLevel
     val blocksForPushing = new ArrayBlockingQueue[Block](1000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 844180c..5f8be93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,7 +46,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     }
   }))
   val clock = {
-    val clockClass = ssc.sc.conf.getOrElse(
+    val clockClass = ssc.sc.conf.get(
       "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
     Class.forName(clockClass).newInstance().asInstanceOf[Clock]
   }
@@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     // or if the property is defined set it to that time
     if (clock.isInstanceOf[ManualClock]) {
       val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = ssc.sc.conf.getOrElse("spark.streaming.manualClock.jump", "0").toLong
+      val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
       clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2c68642/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 651cdaa..9304fc1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   val jobSets = new ConcurrentHashMap[Time, JobSet]
-  val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
+  val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
   val executor = Executors.newFixedThreadPool(numConcurrentJobs)
   val generator = new JobGenerator(this)
   val listenerBus = new StreamingListenerBus()


Mime
View raw message