spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/4] git commit: Add unit test for SparkContext scheduler creation
Date Tue, 03 Dec 2013 05:59:05 GMT
Add unit test for SparkContext scheduler creation

Since YARN and Mesos are not necessarily available in the system,
they are allowed to pass as long as the YARN/Mesos code paths are
exercised.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/081a0b68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/081a0b68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/081a0b68

Branch: refs/heads/master
Commit: 081a0b6861321d262a82166bc1df61959e9c6387
Parents: 37f161c
Author: Aaron Davidson <aaron@databricks.com>
Authored: Thu Nov 28 20:39:10 2013 -0800
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Thu Nov 28 20:40:57 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 234 ++++++++++---------
 .../spark/scheduler/local/LocalScheduler.scala  |   2 +-
 .../SparkContextSchedulerCreationSuite.scala    | 135 +++++++++++
 3 files changed, 255 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/081a0b68/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cf1fd49..1eb00e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -153,121 +153,7 @@ class SparkContext(
   executorEnvs("SPARK_USER") = sparkUser
 
   // Create and start the scheduler
-  private[spark] var taskScheduler: TaskScheduler = {
-    // Regular expression used for local[N] master format
-    val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
-    // Regular expression for local[N, maxRetries], used in tests with failing tasks
-    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
-    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
-    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-    // Regular expression for connecting to Spark deploy clusters
-    val SPARK_REGEX = """spark://(.*)""".r
-    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
-    val MESOS_REGEX = """(mesos|zk)://.*""".r
-    // Regular expression for connection to Simr cluster
-    val SIMR_REGEX = """simr://(.*)""".r
-
-    master match {
-      case "local" =>
-        new LocalScheduler(1, 0, this)
-
-      case LOCAL_N_REGEX(threads) =>
-        new LocalScheduler(threads.toInt, 0, this)
-
-      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
-        new LocalScheduler(threads.toInt, maxFailures.toInt, this)
-
-      case SPARK_REGEX(sparkUrl) =>
-        val scheduler = new ClusterScheduler(this)
-        val masterUrls = sparkUrl.split(",").map("spark://" + _)
-        val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
-        scheduler.initialize(backend)
-        scheduler
-
-      case SIMR_REGEX(simrUrl) =>
-        val scheduler = new ClusterScheduler(this)
-        val backend = new SimrSchedulerBackend(scheduler, this, simrUrl)
-        scheduler.initialize(backend)
-        scheduler
-
-      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will
just hang.
-        val memoryPerSlaveInt = memoryPerSlave.toInt
-        if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
-          throw new SparkException(
-            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
-              memoryPerSlaveInt, SparkContext.executorMemoryRequested))
-        }
-
-        val scheduler = new ClusterScheduler(this)
-        val localCluster = new LocalSparkCluster(
-          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
-        val masterUrls = localCluster.start()
-        val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
-        scheduler.initialize(backend)
-        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
-          localCluster.stop()
-        }
-        scheduler
-
-      case "yarn-standalone" =>
-        val scheduler = try {
-          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
-          val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(this).asInstanceOf[ClusterScheduler]
-        } catch {
-          // TODO: Enumerate the exact reasons why it can fail
-          // But irrespective of it, it means we cannot proceed !
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
-          }
-        }
-        val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
-        scheduler.initialize(backend)
-        scheduler
-
-      case "yarn-client" =>
-        val scheduler = try {
-          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
-          val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(this).asInstanceOf[ClusterScheduler]
-
-        } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
-          }
-        }
-
-        val backend = try {
-          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
-          val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
-          cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
-        } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
-          }
-        }
-
-        scheduler.initialize(backend)
-        scheduler
-
-      case mesosUrl @ MESOS_REGEX(_) =>
-        MesosNativeLibrary.load()
-        val scheduler = new ClusterScheduler(this)
-        val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
-        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
-        val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, this, url, appName)
-        } else {
-          new MesosSchedulerBackend(scheduler, this, url, appName)
-        }
-        scheduler.initialize(backend)
-        scheduler
-
-      case _ =>
-        throw new SparkException("Could not parse Master URL: '" + master + "'")
-    }
-  }
+  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
   taskScheduler.start()
 
   @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
@@ -1137,6 +1023,124 @@ object SparkContext {
       .map(Utils.memoryStringToMb)
       .getOrElse(512)
   }
+
+  // Creates a task scheduler based on a given master URL. Extracted for testing.
+  private
+  def createTaskScheduler(sc: SparkContext, master: String, appName: String): TaskScheduler
= {
+    // Regular expression used for local[N] master format
+    val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
+    // Regular expression for local[N, maxRetries], used in tests with failing tasks
+    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
+    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+    // Regular expression for connecting to Spark deploy clusters
+    val SPARK_REGEX = """spark://(.*)""".r
+    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
+    val MESOS_REGEX = """(mesos|zk)://.*""".r
+    // Regular expression for connection to Simr cluster
+    val SIMR_REGEX = """simr://(.*)""".r
+
+    master match {
+      case "local" =>
+        new LocalScheduler(1, 0, sc)
+
+      case LOCAL_N_REGEX(threads) =>
+        new LocalScheduler(threads.toInt, 0, sc)
+
+      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
+        new LocalScheduler(threads.toInt, maxFailures.toInt, sc)
+
+      case SPARK_REGEX(sparkUrl) =>
+        val scheduler = new ClusterScheduler(sc)
+        val masterUrls = sparkUrl.split(",").map("spark://" + _)
+        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
+        scheduler.initialize(backend)
+        scheduler
+
+      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will
just hang.
+        val memoryPerSlaveInt = memoryPerSlave.toInt
+        if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
+          throw new SparkException(
+            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
+              memoryPerSlaveInt, SparkContext.executorMemoryRequested))
+        }
+
+        val scheduler = new ClusterScheduler(sc)
+        val localCluster = new LocalSparkCluster(
+          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
+        val masterUrls = localCluster.start()
+        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
+        scheduler.initialize(backend)
+        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
+          localCluster.stop()
+        }
+        scheduler
+
+      case "yarn-standalone" =>
+        val scheduler = try {
+          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
+          val cons = clazz.getConstructor(classOf[SparkContext])
+          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+        } catch {
+          // TODO: Enumerate the exact reasons why it can fail
+          // But irrespective of it, it means we cannot proceed !
+          case th: Throwable => {
+            throw new SparkException("YARN mode not available ?", th)
+          }
+        }
+        val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+        scheduler.initialize(backend)
+        scheduler
+
+      case "yarn-client" =>
+        val scheduler = try {
+          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+          val cons = clazz.getConstructor(classOf[SparkContext])
+          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+
+        } catch {
+          case th: Throwable => {
+            throw new SparkException("YARN mode not available ?", th)
+          }
+        }
+
+        val backend = try {
+          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
+          val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
+          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+        } catch {
+          case th: Throwable => {
+            throw new SparkException("YARN mode not available ?", th)
+          }
+        }
+
+        scheduler.initialize(backend)
+        scheduler
+
+      case mesosUrl @ MESOS_REGEX(_) =>
+        MesosNativeLibrary.load()
+        val scheduler = new ClusterScheduler(sc)
+        val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
+        val backend = if (coarseGrained) {
+          new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
+        } else {
+          new MesosSchedulerBackend(scheduler, sc, url, appName)
+        }
+        scheduler.initialize(backend)
+        scheduler
+
+      case SIMR_REGEX(simrUrl) =>
+        val scheduler = new ClusterScheduler(sc)
+        val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
+        scheduler.initialize(backend)
+        scheduler
+
+      case _ =>
+        throw new SparkException("Could not parse Master URL: '" + master + "'")
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/081a0b68/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 2699f0b..5af5116 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -74,7 +74,7 @@ class LocalActor(localScheduler: LocalScheduler, private var freeCores:
Int)
   }
 }
 
-private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
+private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val sc: SparkContext)
   extends TaskScheduler
   with ExecutorBackend
   with Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/081a0b68/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
new file mode 100644
index 0000000..61d6163
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.{FunSuite, PrivateMethodTester}
+
+import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.local.LocalScheduler
+
+class SparkContextSchedulerCreationSuite
+  extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
+
+  def createTaskScheduler(master: String): TaskScheduler = {
+    // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
+    // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
+    sc = new SparkContext("local", "test")
+    val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
+    SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+  }
+
+  test("bad-master") {
+    val e = intercept[SparkException] {
+      createTaskScheduler("localhost:1234")
+    }
+    assert(e.getMessage.contains("Could not parse Master URL"))
+  }
+
+  test("local") {
+    createTaskScheduler("local") match {
+      case s: LocalScheduler =>
+        assert(s.threads === 1)
+        assert(s.maxFailures === 0)
+      case _ => fail()
+    }
+  }
+
+  test("local-n") {
+    createTaskScheduler("local[5]") match {
+      case s: LocalScheduler =>
+        assert(s.threads === 5)
+        assert(s.maxFailures === 0)
+      case _ => fail()
+    }
+  }
+
+  test("local-n-failures") {
+    createTaskScheduler("local[4, 2]") match {
+      case s: LocalScheduler =>
+        assert(s.threads === 4)
+        assert(s.maxFailures === 2)
+      case _ => fail()
+    }
+  }
+
+  test("simr") {
+    createTaskScheduler("simr://uri") match {
+      case s: ClusterScheduler =>
+        assert(s.backend.isInstanceOf[SimrSchedulerBackend])
+      case _ => fail()
+    }
+  }
+
+  test("local-cluster") {
+    createTaskScheduler("local-cluster[3, 14, 512]") match {
+      case s: ClusterScheduler =>
+        assert(s.backend.isInstanceOf[SparkDeploySchedulerBackend])
+      case _ => fail()
+    }
+  }
+
+  def testYarn(master: String, expectedClassName: String) {
+    try {
+      createTaskScheduler(master) match {
+        case s: ClusterScheduler =>
+          assert(s.getClass === Class.forName(expectedClassName))
+        case _ => fail()
+      }
+    } catch {
+      case e: SparkException =>
+        assert(e.getMessage.contains("YARN mode not available"))
+        logWarning("YARN not available, could not test actual YARN scheduler creation")
+      case e: Throwable => fail(e)
+    }
+  }
+  test("yarn-standalone") {
+    testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
+  }
+  test("yarn-client") {
+    testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+  }
+
+  def testMesos(master: String, expectedClass: Class[_]) {
+    try {
+      createTaskScheduler(master) match {
+        case s: ClusterScheduler =>
+          assert(s.backend.getClass === expectedClass)
+        case _ => fail()
+      }
+    } catch {
+      case e: UnsatisfiedLinkError =>
+        assert(e.getMessage.contains("no mesos in"))
+        logWarning("Mesos not available, could not test actual Mesos scheduler creation")
+      case e: Throwable => fail(e)
+    }
+  }
+  test("mesos fine-grained") {
+    System.setProperty("spark.mesos.coarse", "false")
+    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+  }
+  test("mesos coarse-grained") {
+    System.setProperty("spark.mesos.coarse", "true")
+    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+  }
+  test("mesos with zookeeper") {
+    System.setProperty("spark.mesos.coarse", "false")
+    testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
+  }
+}


Mime
View raw message