aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Fixing slave/task reservation check.
Date Fri, 07 Aug 2015 17:14:45 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 8adc9bd87 -> e8a61f68c


Fixing slave/task reservation check.

Bugs closed: AURORA-1431

Reviewed at https://reviews.apache.org/r/37206/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e8a61f68
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e8a61f68
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e8a61f68

Branch: refs/heads/master
Commit: e8a61f68ce87e51629f85801b044c9a2cda7ba0c
Parents: 8adc9bd
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Fri Aug 7 10:12:30 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Fri Aug 7 10:12:30 2015 -0700

----------------------------------------------------------------------
 .../scheduler/filter/SchedulingFilterImpl.java  |  3 +
 .../aurora/scheduler/preemptor/BiCache.java     | 11 ++++
 .../scheduler/scheduling/SchedulingModule.java  |  2 +-
 .../scheduler/scheduling/TaskScheduler.java     | 12 ++--
 .../aurora/scheduler/state/TaskAssigner.java    | 17 ++++--
 .../aurora/scheduler/preemptor/BiCacheTest.java | 14 +++++
 .../scheduling/TaskSchedulerImplTest.java       | 57 +++++++++++-------
 .../scheduler/state/TaskAssignerImplTest.java   | 63 ++++++++++++++++++--
 8 files changed, 137 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 54ffd8e..08d7ac7 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
+
+import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
@@ -169,6 +171,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
         new ConstraintMatcher.NameFilter(DEDICATED_ATTRIBUTE));
   }
 
+  @Timed("scheduling_filter")
   @Override
   public Set<Veto> filter(UnusedResource resource, ResourceRequest request) {
     // Apply veto filtering rules from higher to lower score making sure we cut over and
return

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
index 2551057..98d18df 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -26,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import com.twitter.common.quantity.Amount;
@@ -136,4 +138,13 @@ public class BiCache<K, V> {
     inverse.remove(value, key);
     cache.invalidate(key);
   }
+
+  /**
+   * Returns an immutable copy of entries stored in this cache.
+   *
+   * @return Immutable map of cache entries.
+   */
+  public synchronized Map<K, V> asMap() {
+    return ImmutableMap.copyOf(cache.asMap());
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index b9dccc6..c7a1a46 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
-        bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class);
+        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
         bind(BiCache.BiCacheSettings.class).toInstance(
             new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
         bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 0f0bfca..7761745 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -87,7 +87,7 @@ public interface TaskScheduler extends EventSubscriber {
     private final Storage storage;
     private final TaskAssigner assigner;
     private final Preemptor preemptor;
-    private final BiCache<TaskGroupKey, String> reservations;
+    private final BiCache<String, TaskGroupKey> reservations;
 
     private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
     private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
@@ -98,7 +98,7 @@ public interface TaskScheduler extends EventSubscriber {
         Storage storage,
         TaskAssigner assigner,
         Preemptor preemptor,
-        BiCache<TaskGroupKey, String> reservations) {
+        BiCache<String, TaskGroupKey> reservations) {
 
       this.storage = requireNonNull(storage);
       this.assigner = requireNonNull(assigner);
@@ -146,7 +146,7 @@ public interface TaskScheduler extends EventSubscriber {
             new ResourceRequest(task, aggregate),
             TaskGroupKey.from(task),
             taskId,
-            reservations.get(TaskGroupKey.from(task)));
+            reservations.asMap());
 
         if (!launched) {
           // Task could not be scheduled.
@@ -167,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber {
         AttributeAggregate jobState,
         MutableStoreProvider storeProvider) {
 
-      if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) {
+      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
         return;
       }
       Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
       if (slaveId.isPresent()) {
-        reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get());
+        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
       }
     }
 
@@ -181,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber {
       if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
         IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
         if (assigned.getSlaveId() != null) {
-          reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId());
+          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 0e32990..ae59efa 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -27,6 +27,8 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
+
+import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.HostOffer;
@@ -62,7 +64,7 @@ public interface TaskAssigner {
    * @param resourceRequest The request for resources being scheduled.
    * @param groupKey Task group key.
    * @param taskId Task id to assign.
-   * @param slaveReservation Slave reservation for a given {@code groupKey}.
+   * @param slaveReservations Slave reservations.
    * @return Assignment result.
    */
   boolean maybeAssign(
@@ -70,7 +72,7 @@ public interface TaskAssigner {
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
       String taskId,
-      Optional<String> slaveReservation);
+      Map<String, TaskGroupKey> slaveReservations);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -129,18 +131,21 @@ public interface TaskAssigner {
       return taskFactory.createFrom(assigned, offer.getSlaveId());
     }
 
+    @Timed("assigner_maybe_assign")
     @Override
     public boolean maybeAssign(
         MutableStoreProvider storeProvider,
         ResourceRequest resourceRequest,
         TaskGroupKey groupKey,
         String taskId,
-        Optional<String> slaveReservation) {
+        Map<String, TaskGroupKey> slaveReservations) {
 
       for (HostOffer offer : offerManager.getOffers(groupKey)) {
-        if (slaveReservation.isPresent()
-            && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue()))
{
-          // Task group has a slave reserved but this offer is for a different slave ->
skip.
+        Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
+            slaveReservations.get(offer.getOffer().getSlaveId().getValue()));
+
+        if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) {
+          // This slave is reserved for a different task group -> skip.
           continue;
         }
         Set<Veto> vetoes = filter.filter(

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
index 7312091..f15bcd3 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java
@@ -13,7 +13,10 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
+import java.util.Map;
+
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -62,6 +65,7 @@ public class BiCacheTest {
     biCache.put(KEY_1, 1);
     assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
     assertEquals(Optional.of(1), biCache.get(KEY_1));
+    assertEquals(NO_VALUE, biCache.get(KEY_2));
     biCache.remove(KEY_1, 1);
     assertEquals(NO_VALUE, biCache.get(KEY_1));
     assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
@@ -104,4 +108,14 @@ public class BiCacheTest {
     assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2));
     assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
   }
+
+  @Test
+  public void testAsMap() {
+    biCache.put(KEY_1, 1);
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    Map<String, Integer> map = biCache.asMap();
+
+    biCache.put(KEY_1, 2);
+    assertEquals(ImmutableMap.of(KEY_1, 1), map);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 350ec6f..492334b 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -13,8 +13,11 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.util.Map;
+
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -65,14 +68,16 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A =
       TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
+  private static final TaskGroupKey GROUP_KEY =
+      TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
   private static final String SLAVE_ID = "HOST_A";
-  private static final Optional<String> NO_RESERVATION = Optional.absent();
+  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
 
   private StorageTestUtil storageUtil;
   private TaskAssigner assigner;
   private TaskScheduler scheduler;
   private Preemptor preemptor;
-  private BiCache<TaskGroupKey, String> reservations;
+  private BiCache<String, TaskGroupKey> reservations;
   private EventSink eventSink;
 
   @Before
@@ -80,7 +85,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     assigner = createMock(TaskAssigner.class);
     preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { });
+    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
 
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
@@ -93,7 +98,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations);
+            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
             bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
             bind(Preemptor.class).toInstance(preemptor);
             bind(TaskAssigner.class).toInstance(assigner);
@@ -113,21 +118,21 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private IExpectationSetters<Boolean> expectAssigned(
       IScheduledTask task,
-      Optional<String> reservation) {
+      Map<String, TaskGroupKey> reservationMap) {
 
     return expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
         new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(task.getAssignedTask().getTask()),
         Tasks.id(task),
-        reservation));
+        reservationMap));
   }
 
   @Test
   public void testSchedule() throws Exception {
     storageUtil.expectOperations();
 
-    expectReservationCheck(TASK_A);
+    expectAsMap(NO_RESERVATION);
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
@@ -157,22 +162,24 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
-    expectReservationCheck(TASK_A).times(2);
-    expectPreemptorCall(TASK_A, NO_RESERVATION);
+    expectAsMap(NO_RESERVATION);
+    expectNoReservation(TASK_A);
+    expectPreemptorCall(TASK_A, Optional.<String>absent());
 
     // Slave is reserved.
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
-    expectReservationCheck(TASK_A).times(2);
+    expectAsMap(NO_RESERVATION);
+    expectNoReservation(TASK_A);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
     expectAddReservation(TASK_A, SLAVE_ID);
 
     // Use previously created reservation.
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectGetReservation(TASK_A, SLAVE_ID);
-    expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true);
+    expectAsMap(ImmutableMap.of(SLAVE_ID, GROUP_KEY));
+    expectAssigned(TASK_A, ImmutableMap.of(SLAVE_ID, GROUP_KEY)).andReturn(true);
 
     control.replay();
 
@@ -187,7 +194,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectReservationCheck(TASK_A);
+    expectAsMap(NO_RESERVATION);
     expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
     expectGetReservation(TASK_A, SLAVE_ID);
 
@@ -202,7 +209,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectReservationCheck(TASK_A);
+    expectAsMap(NO_RESERVATION);
     expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
     expectGetReservation(TASK_A, SLAVE_ID);
 
@@ -220,7 +227,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   @Test
   public void testPendingDeletedHandled() throws Exception {
-    reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID);
+    reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
 
     control.replay();
 
@@ -250,7 +257,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
-    expectReservationCheck(TASK_A);
+    expectAsMap(NO_RESERVATION);
     expect(assigner.maybeAssign(
         EasyMock.anyObject(),
         eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
@@ -267,7 +274,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   public void testScheduleThrows() throws Exception {
     storageUtil.expectOperations();
 
-    expectReservationCheck(TASK_A);
+    expectAsMap(NO_RESERVATION);
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected"));
@@ -292,16 +299,20 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   private void expectAddReservation(IScheduledTask task, String slaveId) {
-    reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId);
+    reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
   }
 
   private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId)
{
-    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(Optional.of(slaveId));
+    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of(slaveId));
+  }
+
+  private IExpectationSetters<?> expectNoReservation(IScheduledTask task) {
+    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of());
   }
 
-  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(Optional.<String>absent());
+  private IExpectationSetters<?> expectAsMap(Map<String, TaskGroupKey> map) {
+    return expect(reservations.asMap()).andReturn(map);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e8a61f68/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index c9c6f5d..1de1d1f 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.state;
 
+import java.util.Map;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -37,6 +39,7 @@ import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
@@ -63,10 +66,11 @@ import static org.junit.Assert.assertTrue;
 public class TaskAssignerImplTest extends EasyMockTest {
 
   private static final int PORT = 5000;
+  private static final String SLAVE_ID = "slaveId";
   private static final Offer MESOS_OFFER = Offer.newBuilder()
       .setId(OfferID.newBuilder().setValue("offerId"))
       .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-      .setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
+      .setSlaveId(SlaveID.newBuilder().setValue(SLAVE_ID))
       .setHostname("hostName")
       .addResources(Resource.newBuilder()
           .setName("ports")
@@ -91,6 +95,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
       .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
       .setSlaveId(MESOS_OFFER.getSlaveId())
       .build();
+  private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
 
   private MutableStoreProvider storeProvider;
   private StateManager stateManager;
@@ -134,7 +139,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(TASK.getAssignedTask().getTask()),
         Tasks.id(TASK),
-        Optional.of(MESOS_OFFER.getSlaveId().getValue())));
+        ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
 
   @Test
@@ -153,7 +158,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(TASK.getAssignedTask().getTask()),
         Tasks.id(TASK),
-        Optional.<String>absent()));
+        NO_RESERVATION));
   }
 
   @Test
@@ -171,7 +176,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(TASK.getAssignedTask().getTask()),
         Tasks.id(TASK),
-        Optional.<String>absent()));
+        NO_RESERVATION));
   }
 
   @Test
@@ -207,7 +212,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(TASK.getAssignedTask().getTask()),
         Tasks.id(TASK),
-        Optional.<String>absent()));
+        NO_RESERVATION));
   }
 
   @Test
@@ -221,6 +226,52 @@ public class TaskAssignerImplTest extends EasyMockTest {
         new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
         TaskGroupKey.from(TASK.getAssignedTask().getTask()),
         Tasks.id(TASK),
-        Optional.of("invalid")));
+        ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+            ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+  }
+
+  @Test
+  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
+    // Ensures slave/task reservation relationship is only enforced in slave->task direction
+    // and permissive in task->slave direction. In other words, a task with a slave reservation
+    // should still be tried against other unreserved slaves.
+    HostOffer offer = new HostOffer(
+        Offer.newBuilder()
+            .setId(OfferID.newBuilder().setValue("offerId0"))
+            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+            .setSlaveId(SlaveID.newBuilder().setValue("slaveId0"))
+            .setHostname("hostName0")
+            .addResources(Resource.newBuilder()
+                .setName("ports")
+                .setType(Type.RANGES)
+                .setRanges(
+                    Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
+            .build(),
+        IHostAttributes.build(new HostAttributes()));
+
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer, OFFER));
+    expect(filter.filter(
+        new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY)))
+        .andReturn(ImmutableSet.of());
+    expect(stateManager.assignTask(
+        storeProvider,
+        Tasks.id(TASK),
+        offer.getOffer().getHostname(),
+        offer.getOffer().getSlaveId(),
+        ImmutableMap.of(PORT_NAME, PORT)))
+        .andReturn(TASK.getAssignedTask());
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), offer.getOffer().getSlaveId()))
+        .andReturn(TASK_INFO);
+    offerManager.launchTask(offer.getOffer().getId(), TASK_INFO);
+
+    control.replay();
+
+    assertTrue(assigner.maybeAssign(
+        storeProvider,
+        new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY),
+        TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+        Tasks.id(TASK),
+        ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
   }
 }


Mime
View raw message