spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-1010] Clean up uses of System.setProperty in unit tests
Date Wed, 31 Dec 2014 02:12:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master 035bac88c -> 352ed6bbe


[SPARK-1010] Clean up uses of System.setProperty in unit tests

Several of our tests call System.setProperty (or test code which implicitly sets system properties) and don't always reset/clear the modified properties, which can create ordering dependencies between tests and cause hard-to-diagnose failures.

This patch removes most uses of System.setProperty from our tests, since in most cases we can use SparkConf to set these configurations (there are a few exceptions, including the tests of SparkConf itself).

For the cases where we continue to use System.setProperty, this patch introduces a `ResetSystemProperties` ScalaTest mixin class which snapshots the system properties before individual tests and to automatically restores them on test completion / failure.  See the block comment at the top of the ResetSystemProperties class for more details.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3739 from JoshRosen/cleanup-system-properties-in-tests and squashes the following commits:

0236d66 [Josh Rosen] Replace setProperty uses in two example programs / tools
3888fe3 [Josh Rosen] Remove setProperty use in LocalJavaStreamingContext
4f4031d [Josh Rosen] Add note on why SparkSubmitSuite needs ResetSystemProperties
4742a5b [Josh Rosen] Clarify ResetSystemProperties trait inheritance ordering.
0eaf0b6 [Josh Rosen] Remove setProperty call in TaskResultGetterSuite.
7a3d224 [Josh Rosen] Fix trait ordering
3fdb554 [Josh Rosen] Remove setProperty call in TaskSchedulerImplSuite
bee20df [Josh Rosen] Remove setProperty calls in SparkContextSchedulerCreationSuite
655587c [Josh Rosen] Remove setProperty calls in JobCancellationSuite
3f2f955 [Josh Rosen] Remove System.setProperty calls in DistributedSuite
cfe9cce [Josh Rosen] Remove use of system properties in SparkContextSuite
8783ab0 [Josh Rosen] Remove TestUtils.setSystemProperty, since it is subsumed by the ResetSystemProperties trait.
633a84a [Josh Rosen] Remove use of system properties in FileServerSuite
25bfce2 [Josh Rosen] Use ResetSystemProperties in UtilsSuite
1d1aa5a [Josh Rosen] Use ResetSystemProperties in SizeEstimatorSuite
dd9492b [Josh Rosen] Use ResetSystemProperties in AkkaUtilsSuite
b0daff2 [Josh Rosen] Use ResetSystemProperties in BlockManagerSuite
e9ded62 [Josh Rosen] Use ResetSystemProperties in TaskSchedulerImplSuite
5b3cb54 [Josh Rosen] Use ResetSystemProperties in SparkListenerSuite
0995c4b [Josh Rosen] Use ResetSystemProperties in SparkContextSchedulerCreationSuite
c83ded8 [Josh Rosen] Use ResetSystemProperties in SparkConfSuite
51aa870 [Josh Rosen] Use withSystemProperty in ShuffleSuite
60a63a1 [Josh Rosen] Use ResetSystemProperties in JobCancellationSuite
14a92e4 [Josh Rosen] Use withSystemProperty in FileServerSuite
628f46c [Josh Rosen] Use ResetSystemProperties in DistributedSuite
9e3e0dd [Josh Rosen] Add ResetSystemProperties test fixture mixin; use it in SparkSubmitSuite.
4dcea38 [Josh Rosen] Move withSystemProperty to TestUtils class.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/352ed6bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/352ed6bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/352ed6bb

Branch: refs/heads/master
Commit: 352ed6bbe3c3b67e52e298e7c535ae414d96beca
Parents: 035bac8
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Tue Dec 30 18:12:20 2014 -0800
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Tue Dec 30 18:12:20 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/DistributedSuite.scala     | 21 ++-----
 .../org/apache/spark/FileServerSuite.scala      | 16 ++---
 .../org/apache/spark/JobCancellationSuite.scala | 21 ++++---
 .../scala/org/apache/spark/ShuffleSuite.scala   | 22 +++----
 .../scala/org/apache/spark/SparkConfSuite.scala | 51 ++++++----------
 .../SparkContextSchedulerCreationSuite.scala    | 31 ++++------
 .../org/apache/spark/SparkContextSuite.scala    | 62 +++++++-------------
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  6 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  9 +--
 .../spark/scheduler/TaskResultGetterSuite.scala | 23 +++-----
 .../scheduler/TaskSchedulerImplSuite.scala      |  6 +-
 .../spark/storage/BlockManagerSuite.scala       | 23 +++-----
 .../org/apache/spark/util/AkkaUtilsSuite.scala  |  2 +-
 .../spark/util/ResetSystemProperties.scala      | 57 ++++++++++++++++++
 .../apache/spark/util/SizeEstimatorSuite.scala  | 38 +++---------
 .../org/apache/spark/util/UtilsSuite.scala      |  2 +-
 .../apache/spark/examples/BroadcastTest.scala   |  6 +-
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../apache/spark/tools/StoragePerfTester.scala  | 12 ++--
 23 files changed, 216 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 998f300..97ea357 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import org.scalatest.BeforeAndAfter
 import org.scalatest.FunSuite
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
@@ -29,16 +28,10 @@ class NotSerializableClass
 class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
 
 
-class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
-  with LocalSparkContext {
+class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
 
   val clusterUrl = "local-cluster[2,1,512]"
 
-  after {
-    System.clearProperty("spark.reducer.maxMbInFlight")
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
   test("task throws not serializable exception") {
     // Ensures that executors do not crash when an exn is not serializable. If executors crash,
     // this test will hang. Correct behavior is that executors don't crash but fail tasks
@@ -84,15 +77,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("groupByKey where map output sizes exceed maxMbInFlight") {
-    System.setProperty("spark.reducer.maxMbInFlight", "1")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
     // file should be about 2.5 MB
     val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
     val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
     assert(groups.length === 16)
     assert(groups.map(_._2).sum === 2000)
-    // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
   }
 
   test("accumulators") {
@@ -210,7 +202,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("compute without caching when no partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.0001")
     sc = new SparkContext(clusterUrl, "test")
     // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
     // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
@@ -218,12 +209,11 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("compute when only some partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.01")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
     // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
     // to make sure that *some* of them do fit though
@@ -231,7 +221,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("passing environment variables to cluster") {

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 4942654..0f49ce4 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   @transient var tmpFile: File = _
   @transient var tmpJarUrl: String = _
 
+  def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
+
   override def beforeEach() {
     super.beforeEach()
     resetSparkContext()
-    System.setProperty("spark.authenticate", "false")
   }
 
   override def beforeAll() {
@@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     val jarFile = new File(testTempDir, "test.jar")
     val jarStream = new FileOutputStream(jarFile)
     val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-    System.setProperty("spark.authenticate", "false")
 
     val jarEntry = new JarEntry(textFile.getName)
     jar.putNextEntry(jarEntry)
@@ -74,7 +74,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Distributing files locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -108,7 +108,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
 
   test("Distributing files locally using URL as input") {
     // addFile("file:///....")
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(new File(tmpFile.toString).toURI.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -122,7 +122,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   }
 
   test ("Dynamically adding JARS locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1, 1))
     sc.parallelize(testData).foreach { x =>
@@ -133,7 +133,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Distributing files on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -147,7 +147,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>
@@ -158,7 +158,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster using local: URL") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl.replace("file", "local"))
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 41ed2bc..7584ae7 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -40,12 +40,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
   override def afterEach() {
     super.afterEach()
     resetSparkContext()
-    System.clearProperty("spark.scheduler.mode")
   }
 
   test("local mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local[2]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -53,10 +52,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("local mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local[2]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -64,8 +63,8 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("cluster mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -73,10 +72,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("cluster mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58a9624..f57921b 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -35,19 +35,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   conf.set("spark.test.noStageRetry", "true")
 
   test("groupByKey without compression") {
-    try {
-      System.setProperty("spark.shuffle.compress", "false")
-      sc = new SparkContext("local", "test", conf)
-      val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
-      val groups = pairs.groupByKey(4).collect()
-      assert(groups.size === 2)
-      val valuesFor1 = groups.find(_._1 == 1).get._2
-      assert(valuesFor1.toList.sorted === List(1, 2, 3))
-      val valuesFor2 = groups.find(_._1 == 2).get._2
-      assert(valuesFor2.toList.sorted === List(1))
-    } finally {
-      System.setProperty("spark.shuffle.compress", "true")
-    }
+    val myConf = conf.clone().set("spark.shuffle.compress", "false")
+    sc = new SparkContext("local", "test", myConf)
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+    val groups = pairs.groupByKey(4).collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
   }
 
   test("shuffle non-zero block size") {

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 5d018ea..790976a 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,27 +19,20 @@ package org.apache.spark
 
 import org.scalatest.FunSuite
 import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
+import org.apache.spark.util.ResetSystemProperties
 import com.esotericsoftware.kryo.Kryo
 
-class SparkConfSuite extends FunSuite with LocalSparkContext {
+class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
   test("loading from system properties") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.testProperty") === "2")
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.testProperty") === "2")
   }
 
   test("initializing without loading defaults") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf(false)
-      assert(!conf.contains("spark.test.testProperty"))
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf(false)
+    assert(!conf.contains("spark.test.testProperty"))
   }
 
   test("named set methods") {
@@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
 
   test("nested property names") {
     // This wasn't supported by some external conf parsing libraries
-    try {
-      System.setProperty("spark.test.a", "a")
-      System.setProperty("spark.test.a.b", "a.b")
-      System.setProperty("spark.test.a.b.c", "a.b.c")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "a.b")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-      conf.set("spark.test.a.b", "A.B")
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "A.B")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-    } finally {
-      System.clearProperty("spark.test.a")
-      System.clearProperty("spark.test.a.b")
-      System.clearProperty("spark.test.a.b.c")
-    }
+    System.setProperty("spark.test.a", "a")
+    System.setProperty("spark.test.a.b", "a.b")
+    System.setProperty("spark.test.a.b.c", "a.b.c")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "a.b")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
+    conf.set("spark.test.a.b", "A.B")
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "A.B")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
   }
 
   test("register kryo classes through registerKryoClasses") {

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 0390a2e..8ae4f24 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
 class SparkContextSchedulerCreationSuite
   extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
 
-  def createTaskScheduler(master: String): TaskSchedulerImpl = {
+  def createTaskScheduler(master: String): TaskSchedulerImpl =
+    createTaskScheduler(master, new SparkConf())
+
+  def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val createTaskSchedulerMethod =
       PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
     val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
@@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("local-default-parallelism") {
-    val defaultParallelism = System.getProperty("spark.default.parallelism")
-    System.setProperty("spark.default.parallelism", "16")
-    val sched = createTaskScheduler("local")
+    val conf = new SparkConf().set("spark.default.parallelism", "16")
+    val sched = createTaskScheduler("local", conf)
 
     sched.backend match {
       case s: LocalBackend => assert(s.defaultParallelism() === 16)
       case _ => fail()
     }
-
-    Option(defaultParallelism) match {
-      case Some(v) => System.setProperty("spark.default.parallelism", v)
-      case _ => System.clearProperty("spark.default.parallelism")
-    }
   }
 
   test("simr") {
@@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite
     testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
   }
 
-  def testMesos(master: String, expectedClass: Class[_]) {
+  def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
+    val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
     try {
-      val sched = createTaskScheduler(master)
+      val sched = createTaskScheduler(master, conf)
       assert(sched.backend.getClass === expectedClass)
     } catch {
       case e: UnsatisfiedLinkError =>
@@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("mesos fine-grained") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
   }
 
   test("mesos coarse-grained") {
-    System.setProperty("spark.mesos.coarse", "true")
-    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
   }
 
   test("mesos with zookeeper") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
+    testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 1362022..8b3c687 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,55 +23,37 @@ import org.apache.hadoop.io.BytesWritable
 
 class SparkContextSuite extends FunSuite with LocalSparkContext {
 
-  /** Allows system properties to be changed in tests */
-  private def withSystemProperty[T](property: String, value: String)(block: => T): T = {
-    val originalValue = System.getProperty(property)
-    try {
-      System.setProperty(property, value)
-      block
-    } finally {
-      if (originalValue == null) {
-        System.clearProperty(property)
-      } else {
-        System.setProperty(property, originalValue)
-      }
-    }
-  }
-
   test("Only one SparkContext may be active at a time") {
     // Regression test for SPARK-4180
-    withSystemProperty("spark.driver.allowMultipleContexts", "false") {
-      val conf = new SparkConf().setAppName("test").setMaster("local")
-      sc = new SparkContext(conf)
-      // A SparkContext is already running, so we shouldn't be able to create a second one
-      intercept[SparkException] { new SparkContext(conf) }
-      // After stopping the running context, we should be able to create a new one
-      resetSparkContext()
-      sc = new SparkContext(conf)
-    }
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+      .set("spark.driver.allowMultipleContexts", "false")
+    sc = new SparkContext(conf)
+    // A SparkContext is already running, so we shouldn't be able to create a second one
+    intercept[SparkException] { new SparkContext(conf) }
+    // After stopping the running context, we should be able to create a new one
+    resetSparkContext()
+    sc = new SparkContext(conf)
   }
 
   test("Can still construct a new SparkContext after failing to construct a previous one") {
-    withSystemProperty("spark.driver.allowMultipleContexts", "false") {
-      // This is an invalid configuration (no app name or master URL)
-      intercept[SparkException] {
-        new SparkContext(new SparkConf())
-      }
-      // Even though those earlier calls failed, we should still be able to create a new context
-      sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
+    val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
+    // This is an invalid configuration (no app name or master URL)
+    intercept[SparkException] {
+      new SparkContext(conf)
     }
+    // Even though those earlier calls failed, we should still be able to create a new context
+    sc = new SparkContext(conf.setMaster("local").setAppName("test"))
   }
 
   test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
-    withSystemProperty("spark.driver.allowMultipleContexts", "true") {
-      var secondSparkContext: SparkContext = null
-      try {
-        val conf = new SparkConf().setAppName("test").setMaster("local")
-        sc = new SparkContext(conf)
-        secondSparkContext = new SparkContext(conf)
-      } finally {
-        Option(secondSparkContext).foreach(_.stop())
-      }
+    var secondSparkContext: SparkContext = null
+    try {
+      val conf = new SparkConf().setAppName("test").setMaster("local")
+        .set("spark.driver.allowMultipleContexts", "true")
+      sc = new SparkContext(conf)
+      secondSparkContext = new SparkContext(conf)
+    } finally {
+      Option(secondSparkContext).foreach(_.stop())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index eb7bd7a..5eda2d4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,11 +23,13 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
-class SparkSubmitSuite extends FunSuite with Matchers {
+// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
+// of properties that neeed to be cleared after tests.
+class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
   def beforeAll() {
     System.setProperty("spark.testing", "true")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index b276343..24f41bf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,9 +26,10 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.ResetSystemProperties
 
-class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
-  with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers with BeforeAndAfter
+  with BeforeAndAfterAll with ResetSystemProperties {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
@@ -37,10 +38,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
     sc = new SparkContext("local", "SparkListenerSuite")
   }
 
-  override def afterAll() {
-    System.clearProperty("spark.akka.frameSize")
-  }
-
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 5768a3a..3aab5a1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.storage.TaskResultBlockId
 
 /**
@@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
 /**
  * Tests related to handling task results (both direct and indirect).
  */
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
-  with LocalSparkContext {
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
 
-  override def beforeAll {
-    // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
-    // as we can make it) so the tests don't take too long.
-    System.setProperty("spark.akka.frameSize", "1")
-  }
-
-  override def afterAll {
-    System.clearProperty("spark.akka.frameSize")
-  }
+  // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+  // as we can make it) so the tests don't take too long.
+  def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
 
   test("handling results smaller than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
   test("handling results larger than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val akkaFrameSize =
       sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
     val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
   test("task retried if result missing from block manager") {
     // Set the maximum number of task failures to > 0, so that the task set isn't aborted
     // after the result is missing.
-    sc = new SparkContext("local[1,2]", "test")
+    sc = new SparkContext("local[1,2]", "test", conf)
     // If this test hangs, it's probably because no resource offers were made after the task
     // failed.
     val scheduler: TaskSchedulerImpl = sc.taskScheduler match {

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 7532da8..40aaf9d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -162,12 +162,12 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
   }
 
   test("Fair Scheduler Test") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
     val taskScheduler = new TaskSchedulerImpl(sc)
     val taskSet = FakeTask.createTaskSet(1)
 
-    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
     val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
     schedulableBuilder.buildPools()

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5554efb..ffe6f03 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import akka.util.Timeout
 
 import org.mockito.Mockito.{mock, when}
 
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
+import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
 
@@ -44,18 +44,17 @@ import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import org.apache.spark.util._
 
 
-class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
-  with PrivateMethodTester {
+class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
+  with PrivateMethodTester with ResetSystemProperties {
 
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
   var actorSystem: ActorSystem = null
   var master: BlockManagerMaster = null
-  var oldArch: String = null
   conf.set("spark.authenticate", "false")
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -79,13 +78,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     manager
   }
 
-  before {
+  override def beforeEach(): Unit = {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
       "test", "localhost", 0, conf = conf, securityManager = securityMgr)
     this.actorSystem = actorSystem
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
+    System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.driver.port", boundPort.toString)
@@ -100,7 +99,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     SizeEstimator invokePrivate initialize()
   }
 
-  after {
+  override def afterEach(): Unit = {
     if (store != null) {
       store.stop()
       store = null
@@ -113,14 +112,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     actorSystem.awaitTermination()
     actorSystem = null
     master = null
-
-    if (oldArch != null) {
-      conf.set("os.arch", oldArch)
-    } else {
-      System.clearProperty("os.arch")
-    }
-
-    System.clearProperty("spark.test.useCompressedOops")
   }
 
   test("StorageLevel object caching") {

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 7bca171..6bbf72e 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.BlockManagerId
 /**
   * Test the AkkaUtils with various security settings.
   */
-class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
+class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
 
   test("remote fetch security bad password") {
     val conf = new SparkConf

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
new file mode 100644
index 0000000..d4b92f3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Properties
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+/**
+ * Mixin for automatically resetting system properties that are modified in ScalaTest tests.
+ * This resets the properties after each individual test.
+ *
+ * The order in which fixtures are mixed in affects the order in which they are invoked by tests.
+ * If we have a suite `MySuite extends FunSuite with Foo with Bar`, then
+ * Bar's `super` is Foo, so Bar's beforeEach() will and afterEach() methods will be invoked first
+ * by the rest runner.
+ *
+ * This means that ResetSystemProperties should appear as the last trait in test suites that it's
+ * mixed into in order to ensure that the system properties snapshot occurs as early as possible.
+ * ResetSystemProperties calls super.afterEach() before performing its own cleanup, ensuring that
+ * the old properties are restored as late as possible.
+ *
+ * See the "Composing fixtures by stacking traits" section at
+ * http://www.scalatest.org/user_guide/sharing_fixtures for more details about this pattern.
+ */
+private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Suite =>
+  var oldProperties: Properties = null
+
+  override def beforeEach(): Unit = {
+    oldProperties = new Properties(System.getProperties)
+    super.beforeEach()
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      super.afterEach()
+    } finally {
+      System.setProperties(oldProperties)
+      oldProperties = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 0ea2d13..7424c2e 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.util
 
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester}
 
 class DummyClass1 {}
 
@@ -46,20 +44,12 @@ class DummyString(val arr: Array[Char]) {
 }
 
 class SizeEstimatorSuite
-  extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
+  extends FunSuite with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties {
 
-  var oldArch: String = _
-  var oldOops: String = _
-
-  override def beforeAll() {
+  override def beforeEach() {
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
-    oldOops = System.setProperty("spark.test.useCompressedOops", "true")
-  }
-
-  override def afterAll() {
-    resetOrClear("os.arch", oldArch)
-    resetOrClear("spark.test.useCompressedOops", oldOops)
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "true")
   }
 
   test("simple classes") {
@@ -122,7 +112,7 @@ class SizeEstimatorSuite
   }
 
   test("32-bit arch") {
-    val arch = System.setProperty("os.arch", "x86")
+    System.setProperty("os.arch", "x86")
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
@@ -131,14 +121,13 @@ class SizeEstimatorSuite
     assertResult(48)(SizeEstimator.estimate(DummyString("a")))
     assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
-    resetOrClear("os.arch", arch)
   }
 
   // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("64-bit arch with no compressed oops") {
-    val arch = System.setProperty("os.arch", "amd64")
-    val oops = System.setProperty("spark.test.useCompressedOops", "false")
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "false")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
 
@@ -146,16 +135,5 @@ class SizeEstimatorSuite
     assertResult(64)(SizeEstimator.estimate(DummyString("a")))
     assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
-
-    resetOrClear("os.arch", arch)
-    resetOrClear("spark.test.useCompressedOops", oops)
-  }
-
-  def resetOrClear(prop: String, oldValue: String) {
-    if (oldValue != null) {
-      System.setProperty(prop, oldValue)
-    } else {
-      System.clearProperty(prop)
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f9d4bea..4544382 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.SparkConf
 
-class UtilsSuite extends FunSuite {
+class UtilsSuite extends FunSuite with ResetSystemProperties {
 
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index adecd93..1b53f3e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -28,11 +28,9 @@ object BroadcastTest {
     val bcName = if (args.length > 2) args(2) else "Http"
     val blockSize = if (args.length > 3) args(3) else "4096"
 
-    System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
-      "BroadcastFactory")
-    System.setProperty("spark.broadcast.blockSize", blockSize)
     val sparkConf = new SparkConf().setAppName("Broadcast Test")
-
+      .set("spark.broadcast.factory", s"org.apache.spark.broadcast.${bcName}BroaddcastFactory")
+      .set("spark.broadcast.blockSize", blockSize)
     val sc = new SparkContext(sparkConf)
 
     val slices = if (args.length > 0) args(0).toInt else 2

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/352ed6bb/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index db58eb6..15ee950 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.util.Utils
@@ -49,13 +49,13 @@ object StoragePerfTester {
     val writeData = "1" * recordLength
     val executor = Executors.newFixedThreadPool(numMaps)
 
-    System.setProperty("spark.shuffle.compress", "false")
-    System.setProperty("spark.shuffle.sync", "true")
-    System.setProperty("spark.shuffle.manager",
-      "org.apache.spark.shuffle.hash.HashShuffleManager")
+    val conf = new SparkConf()
+      .set("spark.shuffle.compress", "false")
+      .set("spark.shuffle.sync", "true")
+      .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
 
     // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester")
+    val sc = new SparkContext("local[4]", "Write Tester", conf)
     val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message