aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Offer filtering for static vetoes. Part 3 of 4: Offer filtering.
Date Tue, 24 Feb 2015 21:40:37 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 012059148 -> d93b863cc


Offer filtering for static vetoes. Part 3 of 4: Offer filtering.

Bugs closed: AURORA-909

Reviewed at https://reviews.apache.org/r/30891/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/d93b863c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/d93b863c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/d93b863c

Branch: refs/heads/master
Commit: d93b863cc6cb5d7e769d2af36d522e8aedf30cf3
Parents: 0120591
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Feb 24 13:38:26 2015 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Feb 24 13:38:26 2015 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/OfferManager.java    |  58 ++++++++-
 .../aurora/scheduler/async/TaskScheduler.java   |   4 +-
 .../aurora/scheduler/state/TaskAssigner.java    |  14 ++-
 .../scheduler/async/OfferManagerImplTest.java   | 120 +++++++++++++++++--
 .../scheduler/async/TaskSchedulerImplTest.java  |  26 ++--
 .../scheduler/async/TaskSchedulerTest.java      |   4 +-
 6 files changed, 196 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
index b241d7b..7d37139 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java
@@ -20,15 +20,19 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -38,6 +42,7 @@ import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -78,12 +83,14 @@ public interface OfferManager extends EventSubscriber {
    * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
    *
    * @param acceptor Function that determines if an offer is accepted.
+   * @param groupKey Task group key.
    * @return {@code true} if the task was launched, {@code false} if no offers satisfied
the
    *         {@code acceptor}.
    * @throws LaunchException If the acceptor accepted an offer, but there was an error launching
the
    *                         task.
    */
-  boolean launchFirst(Function<HostOffer, Assignment> acceptor) throws LaunchException;
+  boolean launchFirst(Function<HostOffer, Assignment> acceptor, GroupKey groupKey)
+      throws LaunchException;
 
   /**
    * Notifies the offer queue that a host's attributes have changed.
@@ -121,7 +128,8 @@ public interface OfferManager extends EventSubscriber {
   }
 
   class OfferManagerImpl implements OfferManager {
-    private static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName());
+    @VisibleForTesting
+    static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName());
 
     private final HostOffers hostOffers = new HostOffers();
     private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
@@ -243,6 +251,10 @@ public interface OfferManager extends EventSubscriber {
       private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
       private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
       private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+      // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
+      // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+      // scheduling attempts. See Assignment.Result for more details on static ban.
+      private final Multimap<OfferID, GroupKey> staticallyBannedOffers = HashMultimap.create();
 
       HostOffers() {
         // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
@@ -267,6 +279,7 @@ public interface OfferManager extends EventSubscriber {
           offers.remove(removed);
           offersBySlave.remove(removed.getOffer().getSlaveId());
           offersByHost.remove(removed.getOffer().getHostname());
+          staticallyBannedOffers.removeAll(id);
         }
         return removed != null;
       }
@@ -284,24 +297,50 @@ public interface OfferManager extends EventSubscriber {
         return Iterables.unmodifiableIterable(offers);
       }
 
+      synchronized boolean isStaticallyBanned(HostOffer offer, GroupKey groupKey) {
+        boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
+        if (LOG.isLoggable(Level.FINE)) {
+          LOG.fine(String.format(
+              "Host offer %s is statically banned for %s: %s",
+              offer,
+              groupKey,
+              result));
+        }
+        return result;
+      }
+
+      synchronized void addStaticGroupBan(HostOffer offer, GroupKey groupKey) {
+        OfferID offerId = offer.getOffer().getId();
+        if (offersById.containsKey(offerId)) {
+          staticallyBannedOffers.put(offerId, groupKey);
+
+          if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine(
+                String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
+          }
+        }
+      }
+
       synchronized void clear() {
         offers.clear();
         offersById.clear();
         offersBySlave.clear();
         offersByHost.clear();
+        staticallyBannedOffers.clear();
       }
     }
 
     @Timed("offer_queue_launch_first")
     @Override
-    public boolean launchFirst(Function<HostOffer, Assignment> acceptor)
+    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, GroupKey groupKey)
         throws LaunchException {
 
       // It's important that this method is not called concurrently - doing so would open
up the
       // possibility of a race between the same offers being accepted by different threads.
 
       for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
-        if (acceptOffer(offer, acceptor)) {
+        if (!hostOffers.isStaticallyBanned(offer, groupKey)
+            && acceptOffer(offer, acceptor, groupKey)) {
           return true;
         }
       }
@@ -312,7 +351,8 @@ public interface OfferManager extends EventSubscriber {
     @Timed("offer_queue_accept_offer")
     protected boolean acceptOffer(
         HostOffer offer,
-        Function<HostOffer, Assignment> acceptor) throws LaunchException {
+        Function<HostOffer, Assignment> acceptor,
+        GroupKey groupKey) throws LaunchException {
 
       Assignment assignment = acceptor.apply(offer);
       switch (assignment.getResult()) {
@@ -338,7 +378,13 @@ public interface OfferManager extends EventSubscriber {
             throw new LaunchException(
                 "Accepted offer no longer exists in offer queue, likely data race.");
           }
-        case FAILURE:
+
+        case FAILURE_STATIC_MISMATCH:
+          // Exclude an offer that results in a static mismatch from further attempts to
match
+          // against all tasks from the same group.
+          hostOffers.addStaticGroupBan(offer, groupKey);
+          return false;
+
         default:
           return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 21ea7d2..d0fe3e1 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -42,6 +42,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -216,7 +217,8 @@ public interface TaskScheduler extends EventSubscriber {
         AttributeAggregate aggregate = getJobState(store, task.getJob());
         try {
           boolean launched = offerManager.launchFirst(
-              getAssignerFunction(store, new ResourceRequest(task, taskId, aggregate)));
+              getAssignerFunction(store, new ResourceRequest(task, taskId, aggregate)),
+              new GroupKey(task));
 
           if (!launched) {
             // Task could not be scheduled.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 5a0f7dd..c44ff33 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -30,6 +30,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos.TaskInfo;
@@ -55,7 +56,14 @@ public interface TaskAssigner {
       /**
        * Assignment failed.
        */
-      FAILURE
+      FAILURE,
+
+      /**
+       * Assignment failed with static mismatch (i.e. all {@link Veto} instances group
+       * as {@link VetoGroup}).
+       * @see VetoGroup#STATIC
+       */
+      FAILURE_STATIC_MISMATCH,
     }
 
     private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent();
@@ -107,7 +115,9 @@ public interface TaskAssigner {
         return Result.SUCCESS;
       }
 
-      return Result.FAILURE;
+      return Veto.identifyGroup(vetoes) == VetoGroup.STATIC
+          ? Result.FAILURE_STATIC_MISMATCH
+          : Result.FAILURE;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
index 7ee2bb9..0cbc71d 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferManagerImplTest.java
@@ -14,22 +14,30 @@
 package org.apache.aurora.scheduler.async;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Level;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.testing.TearDown;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
+import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.mesos.Protos.TaskInfo;
 import org.junit.Before;
@@ -38,6 +46,7 @@ import org.junit.Test;
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -56,14 +65,23 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final HostOffer OFFER_C = new HostOffer(
       Offers.makeOffer("OFFER_C", HOST_C),
       IHostAttributes.build(new HostAttributes().setMode(NONE)));
+  private static final GroupKey GROUP_KEY =
+      new GroupKey(ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
 
   private Driver driver;
   private FakeScheduledExecutor clock;
   private Function<HostOffer, Assignment> offerAcceptor;
-  private OfferManagerImpl offerQueue;
+  private OfferManagerImpl offerManager;
 
   @Before
   public void setUp() {
+    offerManager.LOG.setLevel(Level.FINE);
+    addTearDown(new TearDown() {
+      @Override
+      public void tearDown() throws Exception {
+        offerManager.LOG.setLevel(Level.INFO);
+      }
+    });
     driver = createMock(Driver.class);
     ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
     clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
@@ -81,7 +99,7 @@ public class OfferManagerImplTest extends EasyMockTest {
         return RETURN_DELAY;
       }
     };
-    offerQueue = new OfferManagerImpl(driver, returnDelay, executorMock);
+    offerManager = new OfferManagerImpl(driver, returnDelay, executorMock);
   }
 
   @Test
@@ -100,10 +118,90 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerQueue.addOffer(offerA);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.addOffer(offerC);
-    assertTrue(offerQueue.launchFirst(offerAcceptor));
+    offerManager.addOffer(offerA);
+    offerManager.addOffer(OFFER_B);
+    offerManager.addOffer(offerC);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testGetOffersReturnsAllOffers() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+
+    offerManager.cancelOffer(OFFER_A.getOffer().getId());
+    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testOfferFilteringDueToStaticBan() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_B.getOffer().getId(), task);
+
+    driver.declineOffer(OFFER_A.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    // Run again to make sure all offers are banned (via no expectations set).
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Add a new offer to accept the task previously banned for OFFER_A.
+    offerManager.addOffer(OFFER_B);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsCleared() throws Exception {
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
+
+    TaskInfo task = TaskInfo.getDefaultInstance();
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_A.getOffer().getId(), task);
+
+    expect(offerAcceptor.apply(OFFER_A))
+        .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
+
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
+    driver.launchTask(OFFER_A.getOffer().getId(), task);
+
+    driver.declineOffer(OFFER_A.getOffer().getId());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Make sure the static ban is cleared when the offers are returned.
+    clock.advance(RETURN_DELAY);
+    offerManager.addOffer(OFFER_A);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    offerManager.addOffer(OFFER_A);
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
+    // Make sure the static ban is cleared when driver is disconnected.
+    offerManager.driverDisconnected(new DriverDisconnected());
+    offerManager.addOffer(OFFER_A);
+    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+
     clock.advance(RETURN_DELAY);
   }
 
@@ -111,10 +209,10 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testFlushOffers() throws Exception {
     control.replay();
 
-    offerQueue.addOffer(OFFER_A);
-    offerQueue.addOffer(OFFER_B);
-    offerQueue.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerQueue.launchFirst(offerAcceptor));
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_B);
+    offerManager.driverDisconnected(new DriverDisconnected());
+    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
     clock.advance(RETURN_DELAY);
   }
 
@@ -124,7 +222,7 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerQueue.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_A);
     clock.advance(RETURN_DELAY);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 9eef52a..4ee13c8 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -36,6 +36,7 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
@@ -81,6 +82,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       Offers.makeOffer("OFFER_A", "HOST_A"),
       IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
 
+  private static final GroupKey GROUP_A = new GroupKey(TASK_A.getAssignedTask().getTask());
+  private static final GroupKey GROUP_B = new GroupKey(TASK_B.getAssignedTask().getTask());
+
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private TaskAssigner assigner;
@@ -171,13 +175,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     assertFalse(scheduler.schedule("a"));
     assertFalse(scheduler.schedule("b"));
 
-    assignAndAssert(Result.FAILURE, OFFER, firstAssignment);
+    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
 
     clock.advance(reservationDuration);
 
     assertTrue(scheduler.schedule("b"));
 
-    assignAndAssert(Result.SUCCESS, OFFER, secondAssignment);
+    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
   }
 
   @Test
@@ -210,11 +214,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     control.replay();
     assertFalse(scheduler.schedule("a"));
     assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, OFFER, firstAssignment);
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, firstAssignment);
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     clock.advance(halfReservationDuration);
     assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, OFFER, secondAssignment);
+    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
   }
 
   @Test
@@ -237,7 +241,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     clock.advance(halfReservationDuration);
     assertTrue(scheduler.schedule("a"));
 
-    assignAndAssert(Result.SUCCESS, OFFER, assignment);
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   @Test
@@ -263,7 +267,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // Task is killed by user before it is scheduled
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, OFFER, assignment);
+    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, assignment);
   }
 
   @Test
@@ -287,7 +291,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // We don't act on the reservation made by b because we want to see timeout behaviour.
     clock.advance(reservationDuration);
     assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, OFFER, assignment);
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   @Test
@@ -323,7 +327,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     control.replay();
 
     assertTrue(scheduler.schedule(Tasks.id(taskA)));
-    assignAndAssert(Result.SUCCESS, OFFER, assignment);
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   private static IScheduledTask makeTask(String taskId) {
@@ -340,22 +344,26 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static class AssignmentCapture {
     public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
+    public Capture<GroupKey> groupKey = createCapture();
   }
 
   private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
       throws OfferManager.LaunchException {
 
     AssignmentCapture capture = new AssignmentCapture();
-    expect(offerManager.launchFirst(capture(capture.assigner))).andReturn(taskLaunched);
+    expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
+        .andReturn(taskLaunched);
     return capture;
   }
 
   private void assignAndAssert(
       Result result,
+      GroupKey groupKey,
       HostOffer offer,
       AssignmentCapture capture) {
 
     assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
+    assertEquals(groupKey, capture.groupKey.getValue());
   }
 
   private void expectActiveJobFetch(IScheduledTask taskInJob) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d93b863c/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index b6d4d8e..87bc531 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -44,6 +44,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
+import org.apache.aurora.scheduler.async.TaskGroups.GroupKey;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
@@ -66,6 +67,7 @@ import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
@@ -449,7 +451,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     replayAndCreateScheduler();
 
     offerManager.addOffer(OFFER_A);
-    offerManager.launchFirst(offerAcceptor);
+    offerManager.launchFirst(offerAcceptor, new GroupKey(ITaskConfig.build(new TaskConfig())));
     offerExpirationCapture.getValue().run();
   }
 


Mime
View raw message