spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-9352] [SPARK-9353] [HOTFIX] Reverts PR #7668 on branch-1.4
Date Mon, 27 Jul 2015 05:41:05 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 5aed92613 -> 2b1973dd2


[SPARK-9352] [SPARK-9353] [HOTFIX] Reverts PR #7668 on branch-1.4

`MasterSuite.makeMaster()` doesn't compile under 1.4. `Master` is still an actor in branch-1.4,
and can only be created via `ActorSystem.actorOf`, or with `TestActorRef`.

Author: Cheng Lian <lian@databricks.com>

Closes #7686 from liancheng/hotfix-revert-pr-7668 and squashes the following commits:

089a1a8 [Cheng Lian] Revert "[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling
code"
54e8ab0 [Cheng Lian] Revert "[HOTFIX] Fix compile in MasterSuite"


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b1973dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b1973dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b1973dd

Branch: refs/heads/branch-1.4
Commit: 2b1973dd2aa214b14f10bd597c963aded133186a
Parents: 5aed926
Author: Cheng Lian <lian@databricks.com>
Authored: Mon Jul 27 13:33:35 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Mon Jul 27 13:33:35 2015 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../spark/deploy/master/MasterSuite.scala       | 199 +------------------
 2 files changed, 5 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b1973dd/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b4c8771..ac74eba 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -551,7 +551,7 @@ private[master] class Master(
    * allocated at a time, 12 cores from each worker would be assigned to each executor.
    * Since 12 < 16, no executors would launch [SPARK-8881].
    */
-  private def scheduleExecutorsOnWorkers(
+  private[master] def scheduleExecutorsOnWorkers(
       app: ApplicationInfo,
       usableWorkers: Array[WorkerInfo],
       spreadOutApps: Boolean): Array[Int] = {
@@ -577,11 +577,7 @@ private[master] class Master(
         while (keepScheduling && canLaunchExecutor(pos) && coresToAssign
>= coresPerExecutor) {
           coresToAssign -= coresPerExecutor
           assignedCores(pos) += coresPerExecutor
-          // If cores per executor is not set, we are assigning 1 core at a time
-          // without actually meaning to launch 1 executor for each core assigned
-          if (app.desc.coresPerExecutor.isDefined) {
-            assignedMemory(pos) += memoryPerExecutor
-          }
+          assignedMemory(pos) += memoryPerExecutor
 
           // Spreading out an application means spreading out its executors across as
           // many workers as possible. If we are not spreading out, then we should keep

http://git-wip-us.apache.org/repos/asf/spark/blob/2b1973dd/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index e585c98..014e87b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -27,15 +27,14 @@ import scala.language.postfixOps
 import akka.actor.Address
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
-import org.scalatest.{Matchers, PrivateMethodTester}
+import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually
 import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.deploy._
-import org.apache.spark.rpc.RpcEnv
 
-class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester
{
+class MasterSuite extends SparkFunSuite with Matchers with Eventually {
 
   test("toAkkaUrl") {
     val conf = new SparkConf(loadDefaults = false)
@@ -185,196 +184,4 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually
with Priva
     }
   }
 
-  test("basic scheduling - spread out") {
-    testBasicScheduling(spreadOut = true)
-  }
-
-  test("basic scheduling - no spread out") {
-    testBasicScheduling(spreadOut = false)
-  }
-
-  test("scheduling with max cores - spread out") {
-    testSchedulingWithMaxCores(spreadOut = true)
-  }
-
-  test("scheduling with max cores - no spread out") {
-    testSchedulingWithMaxCores(spreadOut = false)
-  }
-
-  test("scheduling with cores per executor - spread out") {
-    testSchedulingWithCoresPerExecutor(spreadOut = true)
-  }
-
-  test("scheduling with cores per executor - no spread out") {
-    testSchedulingWithCoresPerExecutor(spreadOut = false)
-  }
-
-  test("scheduling with cores per executor AND max cores - spread out") {
-    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
-  }
-
-  test("scheduling with cores per executor AND max cores - no spread out") {
-    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
-  }
-
-  private def testBasicScheduling(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(1024)
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    val scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 10)
-    assert(scheduledCores(1) === 10)
-    assert(scheduledCores(2) === 10)
-  }
-
-  private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
-    val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    // With spreading out, each worker should be assigned a few cores
-    if (spreadOut) {
-      assert(scheduledCores(0) === 3)
-      assert(scheduledCores(1) === 3)
-      assert(scheduledCores(2) === 2)
-    } else {
-      // Without spreading out, the cores should be concentrated on the first worker
-      assert(scheduledCores(0) === 8)
-      assert(scheduledCores(1) === 0)
-      assert(scheduledCores(2) === 0)
-    }
-    // Now test the same thing with max cores > cores per worker
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    if (spreadOut) {
-      assert(scheduledCores(0) === 6)
-      assert(scheduledCores(1) === 5)
-      assert(scheduledCores(2) === 5)
-    } else {
-      // Without spreading out, the first worker should be fully booked,
-      // and the leftover cores should spill over to the second worker only.
-      assert(scheduledCores(0) === 10)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 0)
-    }
-  }
-
-  private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
-    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
-    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    // Each worker should end up with 4 executors with 2 cores each
-    // This should be 4 because of the memory restriction on each worker
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 8)
-    assert(scheduledCores(1) === 8)
-    assert(scheduledCores(2) === 8)
-    // Now test the same thing without running into the worker memory limit
-    // Each worker should now end up with 5 executors with 2 cores each
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 10)
-    assert(scheduledCores(1) === 10)
-    assert(scheduledCores(2) === 10)
-    // Now test the same thing with a cores per executor that 10 is not divisible by
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 9)
-    assert(scheduledCores(1) === 9)
-    assert(scheduledCores(2) === 9)
-  }
-
-  // Sorry for the long method name!
-  private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
-    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
-    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    // We should only launch two executors, each with exactly 2 cores
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    if (spreadOut) {
-      assert(scheduledCores(0) === 2)
-      assert(scheduledCores(1) === 2)
-      assert(scheduledCores(2) === 0)
-    } else {
-      assert(scheduledCores(0) === 4)
-      assert(scheduledCores(1) === 0)
-      assert(scheduledCores(2) === 0)
-    }
-    // Test max cores > number of cores per worker
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    if (spreadOut) {
-      assert(scheduledCores(0) === 8)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 6)
-    } else {
-      assert(scheduledCores(0) === 10)
-      assert(scheduledCores(1) === 10)
-      assert(scheduledCores(2) === 0)
-    }
-    // Test max cores > number of cores per worker AND
-    // a cores per executor that is 10 is not divisible by
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    if (spreadOut) {
-      assert(scheduledCores(0) === 6)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 6)
-    } else {
-      assert(scheduledCores(0) === 9)
-      assert(scheduledCores(1) === 9)
-      assert(scheduledCores(2) === 0)
-    }
-  }
-
-  // ===============================
-  // | Utility methods for testing |
-  // ===============================
-
-  private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
-
-  private def makeMaster(conf: SparkConf = new SparkConf): Master = {
-    val securityMgr = new SecurityManager(conf)
-    val rpcEnv = RpcEnv.create(Master.systemName, "localhost", 7077, conf, securityMgr)
-    val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf)
-    master
-  }
-
-  private def makeAppInfo(
-      memoryPerExecutorMb: Int,
-      coresPerExecutor: Option[Int] = None,
-      maxCores: Option[Int] = None): ApplicationInfo = {
-    val desc = new ApplicationDescription(
-      "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
-    val appId = System.currentTimeMillis.toString
-    new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
-  }
-
-  private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
-    val workerId = System.currentTimeMillis.toString
-    new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message