spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [2/2] spark git commit: [SPARK-4525] Mesos should decline unused offers
Date Tue, 25 Nov 2014 03:21:27 GMT
[SPARK-4525] Mesos should decline unused offers

Functionally, this is just a small change on top of #3393 (by jongyoul). The issue being addressed
is discussed in the comments there. I have not yet added a test for the bug there. I will
add one shortly.

I've also done some minor renaming/clean-up of variables in this class and tests.

Author: Patrick Wendell <pwendell@gmail.com>
Author: Jongyoul Lee <jongyoul@gmail.com>

Closes #3436 from pwendell/mesos-issue and squashes the following commits:

58c35b5 [Patrick Wendell] Adding unit test for this situation
c4f0697 [Patrick Wendell] Additional clean-up and fixes on top of existing fix
f20f1b3 [Jongyoul Lee] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused
offers from acceptedOffers - Added code for declining unused offers among acceptedOffers -
Edited testCase for checking declining unused offers

(cherry picked from commit b043c27424d05e3200e7ba99a1a65656b57fa2f0)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10e43391
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10e43391
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10e43391

Branch: refs/heads/branch-1.2
Commit: 10e433919a9a3520007099a3876b47f74c046f12
Parents: e7b8bf0
Author: Jongyoul Lee <jongyoul@gmail.com>
Authored: Mon Nov 24 19:14:14 2014 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Nov 24 19:21:02 2014 -0800

----------------------------------------------------------------------
 .../cluster/mesos/MesosSchedulerBackend.scala   | 25 ++++++--
 .../mesos/MesosSchedulerBackendSuite.scala      | 61 +++++++++++++++-----
 2 files changed, 65 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10e43391/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d137951..10e6886 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -208,10 +208,12 @@ private[spark] class MesosSchedulerBackend(
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     inClassLoader() {
-      val (acceptedOffers, declinedOffers) = offers.partition { o =>
+      // Fail-fast on offers we know will be rejected
+      val (usableOffers, unUsableOffers) = offers.partition { o =>
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
+        // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
         (mem >= MemoryUtils.calculateTotalMemory(sc) &&
           // need at least 1 for executor, 1 for task
           cpus >= 2 * scheduler.CPUS_PER_TASK) ||
@@ -219,11 +221,12 @@ private[spark] class MesosSchedulerBackend(
             cpus >= scheduler.CPUS_PER_TASK)
       }
 
-      val offerableWorkers = acceptedOffers.map { o =>
+      val workerOffers = usableOffers.map { o =>
         val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
         } else {
           // If the executor doesn't exist yet, subtract CPU for executor
+          // TODO(pwendell): Should below just subtract "1"?
           getResource(o.getResourcesList, "cpus").toInt -
             scheduler.CPUS_PER_TASK
         }
@@ -233,17 +236,20 @@ private[spark] class MesosSchedulerBackend(
           cpus)
       }
 
-      val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
+      val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
 
       val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
 
+      val slavesIdsOfAcceptedOffers = HashSet[String]()
+
       // Call into the TaskSchedulerImpl
-      scheduler.resourceOffers(offerableWorkers)
-        .filter(!_.isEmpty)
+      val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
+      acceptedOffers
         .foreach { offer =>
           offer.foreach { taskDesc =>
             val slaveId = taskDesc.executorId
             slaveIdsWithExecutors += slaveId
+            slavesIdsOfAcceptedOffers += slaveId
             taskIdToSlaveId(taskDesc.taskId) = slaveId
             mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
               .add(createMesosTask(taskDesc, slaveId))
@@ -257,7 +263,14 @@ private[spark] class MesosSchedulerBackend(
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
       }
 
-      declinedOffers.foreach(o => d.declineOffer(o.getId))
+      // Decline offers that weren't used
+      // NOTE: This logic assumes that we only get a single offer for each host in a given
batch
+      for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue))
{
+        d.declineOffer(o.getId)
+      }
+
+      // Decline offers we ruled out immediately
+      unUsableOffers.foreach(o => d.declineOffer(o.getId))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/10e43391/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index bef8d3a..e60e70a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -30,9 +30,11 @@ import java.nio.ByteBuffer
 import java.util.Collections
 import java.util
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar
{
-  test("mesos resource offer is launching tasks") {
+
+  test("mesos resource offers result in launching tasks") {
     def createOffer(id: Int, mem: Int, cpu: Int) = {
       val builder = Offer.newBuilder()
       builder.addResourcesBuilder()
@@ -43,46 +45,61 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext
with Ea
         .setName("cpus")
         .setType(Value.Type.SCALAR)
         .setScalar(Scalar.newBuilder().setValue(cpu))
-      builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
-        .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build()
+      builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+        .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
     }
 
     val driver = EasyMock.createMock(classOf[SchedulerDriver])
     val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
 
     val sc = EasyMock.createMock(classOf[SparkContext])
-
     EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
     EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
     EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
     EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
     EasyMock.replay(sc)
+
     val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
     val minCpu = 4
-    val offers = new java.util.ArrayList[Offer]
-    offers.add(createOffer(1, minMem, minCpu))
-    offers.add(createOffer(1, minMem - 1, minCpu))
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(createOffer(1, minMem, minCpu))
+    mesosOffers.add(createOffer(2, minMem - 1, minCpu))
+    mesosOffers.add(createOffer(3, minMem, minCpu))
+
     val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-    val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer(
-      o.getSlaveId.getValue,
-      o.getHostname,
+
+    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(0).getSlaveId.getValue,
+      mesosOffers.get(0).getHostname,
+      2
+    ))
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(2).getSlaveId.getValue,
+      mesosOffers.get(2).getHostname,
       2
     ))
     val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
-    EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc)))
+    EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
     EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
     EasyMock.replay(taskScheduler)
+
     val capture = new Capture[util.Collection[TaskInfo]]
     EasyMock.expect(
       driver.launchTasks(
-        EasyMock.eq(Collections.singleton(offers.get(0).getId)),
+        EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
         EasyMock.capture(capture),
         EasyMock.anyObject(classOf[Filters])
       )
-    ).andReturn(Status.valueOf(1))
-    EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1))
+    ).andReturn(Status.valueOf(1)).once
+    EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
+    EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
     EasyMock.replay(driver)
-    backend.resourceOffers(driver, offers)
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    EasyMock.verify(driver)
     assert(capture.getValue.size() == 1)
     val taskInfo = capture.getValue.iterator().next()
     assert(taskInfo.getName.equals("n1"))
@@ -90,5 +107,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext
with Ea
     assert(cpus.getName.equals("cpus"))
     assert(cpus.getScalar.getValue.equals(2.0))
     assert(taskInfo.getSlaveId.getValue.equals("s1"))
+
+    // Unwanted resources offered on an existing node. Make sure they are declined
+    val mesosOffers2 = new java.util.ArrayList[Offer]
+    mesosOffers2.add(createOffer(1, minMem, minCpu))
+    EasyMock.reset(taskScheduler)
+    EasyMock.reset(driver)
+    EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
+    EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
+    EasyMock.replay(taskScheduler)
+    EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
+    EasyMock.replay(driver)
+
+    backend.resourceOffers(driver, mesosOffers2)
+    EasyMock.verify(driver)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message