aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/2] aurora git commit: Centralizing offer/task matching in TaskAssigner.
Date Mon, 03 Aug 2015 18:47:15 GMT
Centralizing offer/task matching in TaskAssigner.

Bugs closed: AURORA-1416

Reviewed at https://reviews.apache.org/r/37001/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/fb032506
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/fb032506
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/fb032506

Branch: refs/heads/master
Commit: fb032506529894628d9e0f85a0ded095c938bf49
Parents: 1c0086f
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Mon Aug 3 11:46:58 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Mon Aug 3 11:46:58 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |   5 +-
 .../benchmark/fakes/FakeOfferManager.java       |  20 +-
 .../aurora/scheduler/offers/OfferManager.java   | 148 ++--
 .../scheduler/scheduling/SchedulingModule.java  |   2 +-
 .../scheduler/scheduling/TaskScheduler.java     | 101 +--
 .../aurora/scheduler/state/TaskAssigner.java    | 220 +++---
 .../scheduler/offers/OfferManagerImplTest.java  | 164 +++--
 .../scheduling/TaskSchedulerImplTest.java       | 216 +++---
 .../scheduler/scheduling/TaskSchedulerTest.java | 671 -------------------
 .../scheduler/state/TaskAssignerImplTest.java   | 122 +++-
 10 files changed, 487 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 5bc73d5..d75f090 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -38,6 +38,7 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
 import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -117,7 +118,9 @@ public class SchedulingBenchmarks {
           new PrivateModule() {
             @Override
             protected void configure() {
-              bind(ScheduledExecutorService.class).toInstance(executor);
+              bind(ScheduledExecutorService.class)
+                  .annotatedWith(AsyncModule.AsyncExecutor.class)
+                  .toInstance(executor);
               bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
               bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
               bind(OfferManager.OfferReturnDelay.class).toInstance(

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index f413301..fbd24ea 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -13,14 +13,12 @@
  */
 package org.apache.aurora.benchmark.fakes;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.mesos.Protos;
 
 public class FakeOfferManager implements OfferManager {
@@ -30,15 +28,23 @@ public class FakeOfferManager implements OfferManager {
   }
 
   @Override
-  public void cancelOffer(Protos.OfferID offer) {
+  public void cancelOffer(Protos.OfferID offerId) {
     // no-op
   }
 
   @Override
-  public boolean launchFirst(
-      Function<HostOffer, TaskAssigner.Assignment> acceptor,
-      TaskGroupKey groupKey) throws LaunchException {
-    return false;
+  public void launchTask(Protos.OfferID offerId, Protos.TaskInfo taskInfo) throws LaunchException {
+    // no-op
+  }
+
+  @Override
+  public void banOffer(Protos.OfferID offerId, TaskGroupKey groupKey) {
+    // no-op
+  }
+
+  @Override
+  public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 14bf265..4b8a55f 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
@@ -29,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -47,8 +47,8 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
 
@@ -76,22 +76,27 @@ public interface OfferManager extends EventSubscriber {
    * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
    * tasks against the offer.
    *
-   * @param offer Canceled offer.
+   * @param offerId Canceled offer.
    */
-  void cancelOffer(OfferID offer);
+  void cancelOffer(OfferID offerId);
 
   /**
-   * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
+   * Exclude an offer that results in a static mismatch from further attempts to match against all
+   * tasks from the same group.
    *
-   * @param acceptor Function that determines if an offer is accepted.
-   * @param groupKey Task group key.
-   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
-   *         {@code acceptor}.
-   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
-   *                         task.
+   * @param offerId Offer ID to exclude for the given {@code groupKey}.
+   * @param groupKey Task group key to exclude.
    */
-  boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-      throws LaunchException;
+  void banOffer(OfferID offerId, TaskGroupKey groupKey);
+
+  /**
+   * Launches the task matched against the offer.
+   *
+   * @param offerId Matched offer ID.
+   * @param task Matched task info.
+   * @throws LaunchException If there was an error launching the task.
+   */
+  void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
 
   /**
    * Notifies the offer queue that a host's attributes have changed.
@@ -108,6 +113,14 @@ public interface OfferManager extends EventSubscriber {
   Iterable<HostOffer> getOffers();
 
   /**
+   * Gets all offers that are not statically banned for the given {@code groupKey}.
+   *
+   * @param groupKey Task group key to check offers for.
+   * @return A snapshot of all offers eligible for the given {@code groupKey}.
+   */
+  Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+
+  /**
    * Gets an offer for the given slave ID.
    *
    * @param slaveId Slave ID to get offer for.
@@ -127,7 +140,8 @@ public interface OfferManager extends EventSubscriber {
    * Thrown when there was an unexpected failure trying to launch a task.
    */
   class LaunchException extends Exception {
-    LaunchException(String msg) {
+    @VisibleForTesting
+    public LaunchException(String msg) {
       super(msg);
     }
 
@@ -218,6 +232,11 @@ public interface OfferManager extends EventSubscriber {
     }
 
     @Override
+    public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
+      return hostOffers.getWeaklyConsistentOffers(groupKey);
+    }
+
+    @Override
     public Optional<HostOffer> getOffer(SlaveID slaveId) {
       return hostOffers.get(slaveId);
     }
@@ -268,7 +287,7 @@ public interface OfferManager extends EventSubscriber {
       private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
       // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
       // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
-      // scheduling attempts. See Assignment.Result for more details on static ban.
+      // scheduling attempts. See VetoGroup for more details on static ban.
       private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create();
 
       HostOffers() {
@@ -304,7 +323,7 @@ public interface OfferManager extends EventSubscriber {
         if (offer != null) {
           // Remove and re-add a host's offer to re-sort based on its new hostStatus
           remove(offer.getOffer().getId());
-          add(new HostOffer(offer.getOffer(),  attributes));
+          add(new HostOffer(offer.getOffer(), attributes));
         }
       }
 
@@ -312,27 +331,14 @@ public interface OfferManager extends EventSubscriber {
         return Iterables.unmodifiableIterable(offers);
       }
 
-      synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
-        boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey);
-        if (LOG.isLoggable(Level.FINE)) {
-          LOG.fine(String.format(
-              "Host offer %s is statically banned for %s: %s",
-              offer,
-              groupKey,
-              result));
-        }
-        return result;
+      synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
+        return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(
+            e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), groupKey)));
       }
 
-      synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) {
-        OfferID offerId = offer.getOffer().getId();
+      synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
         if (offersById.containsKey(offerId)) {
           staticallyBannedOffers.put(offerId, groupKey);
-
-          if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine(
-                String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey));
-          }
         }
       }
 
@@ -345,63 +351,31 @@ public interface OfferManager extends EventSubscriber {
       }
     }
 
-    @Timed("offer_queue_launch_first")
     @Override
-    public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey)
-        throws LaunchException {
-
-      // It's important that this method is not called concurrently - doing so would open up the
-      // possibility of a race between the same offers being accepted by different threads.
-
-      for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
-        if (!hostOffers.isStaticallyBanned(offer, groupKey)
-            && acceptOffer(offer, acceptor, groupKey)) {
-          return true;
-        }
-      }
-
-      return false;
+    public void banOffer(OfferID offerId, TaskGroupKey groupKey) {
+      hostOffers.addStaticGroupBan(offerId, groupKey);
     }
 
-    @Timed("offer_queue_accept_offer")
-    protected boolean acceptOffer(
-        HostOffer offer,
-        Function<HostOffer, Assignment> acceptor,
-        TaskGroupKey groupKey) throws LaunchException {
-
-      Assignment assignment = acceptor.apply(offer);
-      switch (assignment.getResult()) {
-
-        case SUCCESS:
-          // Guard against an offer being removed after we grabbed it from the iterator.
-          // If that happens, the offer will not exist in hostOffers, and we can immediately
-          // send it back to LOST for quick reschedule.
-          // Removing while iterating counts on the use of a weakly-consistent iterator being used,
-          // which is a feature of ConcurrentSkipListSet.
-          if (hostOffers.remove(offer.getOffer().getId())) {
-            try {
-              driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
-              return true;
-            } catch (IllegalStateException e) {
-              // TODO(William Farner): Catch only the checked exception produced by Driver
-              // once it changes from throwing IllegalStateException when the driver is not yet
-              // registered.
-              throw new LaunchException("Failed to launch task.", e);
-            }
-          } else {
-            offerRaces.incrementAndGet();
-            throw new LaunchException(
-                "Accepted offer no longer exists in offer queue, likely data race.");
-          }
-
-        case FAILURE_STATIC_MISMATCH:
-          // Exclude an offer that results in a static mismatch from further attempts to match
-          // against all tasks from the same group.
-          hostOffers.addStaticGroupBan(offer, groupKey);
-          return false;
-
-        default:
-          return false;
+    @Timed("offer_manager_launch_task")
+    @Override
+    public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+      // Guard against an offer being removed after we grabbed it from the iterator.
+      // If that happens, the offer will not exist in hostOffers, and we can immediately
+      // send it back to LOST for quick reschedule.
+      // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+      // which is a feature of ConcurrentSkipListSet.
+      if (hostOffers.remove(offerId)) {
+        try {
+          driver.launchTask(offerId, task);
+        } catch (IllegalStateException e) {
+          // TODO(William Farner): Catch only the checked exception produced by Driver
+          // once it changes from throwing IllegalStateException when the driver is not yet
+          // registered.
+          throw new LaunchException("Failed to launch task.", e);
+        }
+      } else {
+        offerRaces.incrementAndGet();
+        throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index c7a1a46..b9dccc6 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -112,7 +112,7 @@ public class SchedulingModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
-        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
+        bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).in(Singleton.class);
         bind(BiCache.BiCacheSettings.class).toInstance(
             new BiCache.BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size"));
         bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d4bd529..0f0bfca 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -23,26 +23,21 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.stats.Stats;
 
-import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -56,7 +51,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 
 /**
@@ -91,11 +85,9 @@ public interface TaskScheduler extends EventSubscriber {
     private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
 
     private final Storage storage;
-    private final StateManager stateManager;
     private final TaskAssigner assigner;
-    private final OfferManager offerManager;
     private final Preemptor preemptor;
-    private final BiCache<String, TaskGroupKey> reservations;
+    private final BiCache<TaskGroupKey, String> reservations;
 
     private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
     private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
@@ -104,54 +96,16 @@ public interface TaskScheduler extends EventSubscriber {
     @Inject
     TaskSchedulerImpl(
         Storage storage,
-        StateManager stateManager,
         TaskAssigner assigner,
-        OfferManager offerManager,
         Preemptor preemptor,
-        BiCache<String, TaskGroupKey> reservations) {
+        BiCache<TaskGroupKey, String> reservations) {
 
       this.storage = requireNonNull(storage);
-      this.stateManager = requireNonNull(stateManager);
       this.assigner = requireNonNull(assigner);
-      this.offerManager = requireNonNull(offerManager);
       this.preemptor = requireNonNull(preemptor);
       this.reservations = requireNonNull(reservations);
     }
 
-    private Function<HostOffer, Assignment> getAssignerFunction(
-        final MutableStoreProvider storeProvider,
-        final ResourceRequest resourceRequest,
-        final String taskId) {
-
-      // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
-      // and perform the assignment at the very end.  This will allow us to use optimistic locking
-      // at the top of the stack and avoid holding the write lock for too long.
-      return new Function<HostOffer, Assignment>() {
-        @Override
-        public Assignment apply(HostOffer offer) {
-          Optional<TaskGroupKey> reservation =
-              reservations.get(offer.getOffer().getSlaveId().getValue());
-
-          if (reservation.isPresent()) {
-            if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
-              // Slave is reserved to satisfy this task group.
-              return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-            } else {
-              // Slave is reserved for another task.
-              return Assignment.failure();
-            }
-          } else {
-            // Slave is not reserved.
-            return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId);
-          }
-        }
-      };
-    }
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-
     @Timed("task_schedule_attempt")
     @Override
     public boolean schedule(final String taskId) {
@@ -186,35 +140,22 @@ public interface TaskScheduler extends EventSubscriber {
       } else {
         ITaskConfig task = assignedTask.getTask();
         AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-        try {
-          boolean launched = offerManager.launchFirst(
-              getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId),
-              TaskGroupKey.from(task));
-
-          if (!launched) {
-            // Task could not be scheduled.
-            // TODO(maxim): Now that preemption slots are searched asynchronously, consider
-            // retrying a launch attempt within the current scheduling round IFF a reservation is
-            // available.
-            maybePreemptFor(assignedTask, aggregate, store);
-            attemptsNoMatch.incrementAndGet();
-            return false;
-          }
-        } catch (OfferManager.LaunchException e) {
-          LOG.log(Level.WARNING, "Failed to launch task.", e);
-          attemptsFailed.incrementAndGet();
-
-          // The attempt to schedule the task failed, so we need to backpedal on the
-          // assignment.
-          // It is in the LOST state and a new task will move to PENDING to replace it.
-          // Should the state change fail due to storage issues, that's okay.  The task will
-          // time out in the ASSIGNED state and be moved to LOST.
-          stateManager.changeState(
-              store,
-              taskId,
-              Optional.of(PENDING),
-              LOST,
-              LAUNCH_FAILED_MSG);
+
+        boolean launched = assigner.maybeAssign(
+            store,
+            new ResourceRequest(task, aggregate),
+            TaskGroupKey.from(task),
+            taskId,
+            reservations.get(TaskGroupKey.from(task)));
+
+        if (!launched) {
+          // Task could not be scheduled.
+          // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+          // retrying a launch attempt within the current scheduling round IFF a reservation is
+          // available.
+          maybePreemptFor(assignedTask, aggregate, store);
+          attemptsNoMatch.incrementAndGet();
+          return false;
         }
       }
 
@@ -226,12 +167,12 @@ public interface TaskScheduler extends EventSubscriber {
         AttributeAggregate jobState,
         MutableStoreProvider storeProvider) {
 
-      if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+      if (reservations.get(TaskGroupKey.from(task.getTask())).isPresent()) {
         return;
       }
       Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
       if (slaveId.isPresent()) {
-        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+        reservations.put(TaskGroupKey.from(task.getTask()), slaveId.get());
       }
     }
 
@@ -240,7 +181,7 @@ public interface TaskScheduler extends EventSubscriber {
       if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
         IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
         if (assigned.getSlaveId() != null) {
-          reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+          reservations.remove(TaskGroupKey.from(assigned.getTask()), assigned.getSlaveId());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 3acb45a..0e32990 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -16,20 +16,22 @@ package org.apache.aurora.scheduler.state;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -37,161 +39,64 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
 
 /**
- * Responsible for matching a task against an offer.
+ * Responsible for matching a task against an offer and launching it.
  */
 public interface TaskAssigner {
-
-  final class Assignment {
-
-    public enum Result {
-      /**
-       * Assignment successful.
-       */
-      SUCCESS,
-
-      /**
-       * Assignment failed.
-       */
-      FAILURE,
-
-      /**
-       * Assignment failed with static mismatch (i.e. all {@link Veto} instances group
-       * as {@link VetoGroup}).
-       * @see VetoGroup#STATIC
-       */
-      FAILURE_STATIC_MISMATCH,
-    }
-
-    private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent();
-    private static final ImmutableSet<Veto> NO_VETOES = ImmutableSet.of();
-    private final Optional<TaskInfo> taskInfo;
-    private final Set<Veto> vetoes;
-
-    private Assignment(Optional<TaskInfo> taskInfo, Set<Veto> vetoes) {
-      this.taskInfo = taskInfo;
-      this.vetoes = vetoes;
-    }
-
-    /**
-     * Creates a successful assignment instance.
-     *
-     * @param taskInfo {@link TaskInfo} to launch.
-     * @return A successful {@link Assignment}.
-     */
-    public static Assignment success(TaskInfo taskInfo) {
-      return new Assignment(Optional.of(taskInfo), NO_VETOES);
-    }
-
-    /**
-     * Creates a failed assignment instance with a set of {@link Veto} applied.
-     *
-     * @param vetoes Set of {@link Veto} instances issued for the failed offer/task match.
-     * @return A failed {@link Assignment}.
-     */
-    public static Assignment failure(Set<Veto> vetoes) {
-      return new Assignment(NO_TASK_INFO, MorePreconditions.checkNotBlank(vetoes));
-    }
-
-    /**
-     * Creates a failed assignment instance.
-     *
-     * @return A failed {@link Assignment}.
-     */
-    public static Assignment failure() {
-      return new Assignment(NO_TASK_INFO, NO_VETOES);
-    }
-
-    /**
-     * Generates the {@link Result} based on the assignment details.
-     *
-     * @return An assignment {@link Result}.
-     */
-    public Result getResult() {
-      if (taskInfo.isPresent()) {
-        return Result.SUCCESS;
-      }
-
-      return Veto.identifyGroup(vetoes) == VetoGroup.STATIC
-          ? Result.FAILURE_STATIC_MISMATCH
-          : Result.FAILURE;
-    }
-
-    /**
-     * A {@link TaskInfo} to launch.
-     *
-     * @return Optional of {@link TaskInfo}.
-     */
-    public Optional<TaskInfo> getTaskInfo() {
-      return taskInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Assignment)) {
-        return false;
-      }
-
-      Assignment other = (Assignment) o;
-
-      return Objects.equal(taskInfo, other.taskInfo)
-          && Objects.equal(vetoes, other.vetoes);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(taskInfo, vetoes);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("taskInfo", taskInfo)
-          .add("vetoes", vetoes)
-          .toString();
-    }
-  }
-
   /**
-   * Tries to match a task against an offer.  If a match is found, the assigner should
-   * make the appropriate changes to the task and provide an {@link Assignment} result.
+   * Tries to match a task against an offer.  If a match is found, the assigner makes the
+   * appropriate changes to the task and requests task launch.
    *
    * @param storeProvider Storage provider.
-   * @param offer The resource offer.
    * @param resourceRequest The request for resources being scheduled.
+   * @param groupKey Task group key.
    * @param taskId Task id to assign.
-   * @return {@link Assignment} with assignment result.
+   * @param slaveReservation Slave reservation for a given {@code groupKey}.
+   * @return Assignment result.
    */
-  Assignment maybeAssign(
+  boolean maybeAssign(
       MutableStoreProvider storeProvider,
-      HostOffer offer,
       ResourceRequest resourceRequest,
-      String taskId);
+      TaskGroupKey groupKey,
+      String taskId,
+      Optional<String> slaveReservation);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
 
+    @VisibleForTesting
+    static final Optional<String> LAUNCH_FAILED_MSG =
+        Optional.of("Unknown exception attempting to schedule task.");
+
+    private final AtomicLong launchFailures = Stats.exportLong("assigner_launch_failures");
+
     private final StateManager stateManager;
     private final SchedulingFilter filter;
     private final MesosTaskFactory taskFactory;
+    private final OfferManager offerManager;
 
     @Inject
     public TaskAssignerImpl(
         StateManager stateManager,
         SchedulingFilter filter,
-        MesosTaskFactory taskFactory) {
+        MesosTaskFactory taskFactory,
+        OfferManager offerManager) {
 
       this.stateManager = requireNonNull(stateManager);
       this.filter = requireNonNull(filter);
       this.taskFactory = requireNonNull(taskFactory);
+      this.offerManager = requireNonNull(offerManager);
     }
 
     private TaskInfo assign(
@@ -225,26 +130,61 @@ public interface TaskAssigner {
     }
 
     @Override
-    public Assignment maybeAssign(
+    public boolean maybeAssign(
         MutableStoreProvider storeProvider,
-        HostOffer offer,
         ResourceRequest resourceRequest,
-        String taskId) {
-
-      Set<Veto> vetoes = filter.filter(
-          new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
-          resourceRequest);
-      if (vetoes.isEmpty()) {
-        return Assignment.success(assign(
-            storeProvider,
-            offer.getOffer(),
-            resourceRequest.getRequestedPorts(),
-            taskId));
-      } else {
-        LOG.fine("Slave " + offer.getOffer().getHostname()
-            + " vetoed task " + taskId + ": " + vetoes);
-        return Assignment.failure(vetoes);
+        TaskGroupKey groupKey,
+        String taskId,
+        Optional<String> slaveReservation) {
+
+      for (HostOffer offer : offerManager.getOffers(groupKey)) {
+        if (slaveReservation.isPresent()
+            && !slaveReservation.get().equals(offer.getOffer().getSlaveId().getValue())) {
+          // Task group has a slave reserved but this offer is for a different slave -> skip.
+          continue;
+        }
+        Set<Veto> vetoes = filter.filter(
+            new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
+            resourceRequest);
+        if (vetoes.isEmpty()) {
+          TaskInfo taskInfo = assign(
+              storeProvider,
+              offer.getOffer(),
+              resourceRequest.getRequestedPorts(),
+              taskId);
+
+          try {
+            offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+            return true;
+          } catch (OfferManager.LaunchException e) {
+            LOG.log(Level.WARNING, "Failed to launch task.", e);
+            launchFailures.incrementAndGet();
+
+            // The attempt to schedule the task failed, so we need to backpedal on the
+            // assignment.
+            // It is in the LOST state and a new task will move to PENDING to replace it.
+            // Should the state change fail due to storage issues, that's okay.  The task will
+            // time out in the ASSIGNED state and be moved to LOST.
+            stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.of(PENDING),
+                LOST,
+                LAUNCH_FAILED_MSG);
+            return false;
+          }
+        } else {
+          if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
+            // Never attempt to match this offer/groupKey pair again.
+            offerManager.banOffer(offer.getOffer().getId(), groupKey);
+          }
+
+          LOG.fine("Slave " + offer.getOffer().getHostname()
+              + " vetoed task " + taskId + ": " + vetoes);
+          return false;
+        }
       }
+      return false;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 04be32e..088a4a6 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -16,7 +16,7 @@ package org.apache.aurora.scheduler.offers;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 
-import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.testing.TearDown;
@@ -31,32 +31,34 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.TaskInfo;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class OfferManagerImplTest extends EasyMockTest {
 
   private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
   private static final String HOST_A = "HOST_A";
+  private static final IHostAttributes HOST_ATTRIBUTES_A =
+      IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_A));
   private static final HostOffer OFFER_A = new HostOffer(
       Offers.makeOffer("OFFER_A", HOST_A),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+      HOST_ATTRIBUTES_A);
+  private static final Protos.OfferID OFFER_A_ID = OFFER_A.getOffer().getId();
   private static final String HOST_B = "HOST_B";
   private static final HostOffer OFFER_B = new HostOffer(
       Offers.makeOffer("OFFER_B", HOST_B),
@@ -67,10 +69,10 @@ public class OfferManagerImplTest extends EasyMockTest {
       IHostAttributes.build(new HostAttributes().setMode(NONE)));
   private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(
       ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name"))));
+  private static final TaskInfo TASK_INFO = TaskInfo.getDefaultInstance();
 
   private Driver driver;
   private FakeScheduledExecutor clock;
-  private Function<HostOffer, Assignment> offerAcceptor;
   private OfferManagerImpl offerManager;
 
   @Before
@@ -92,7 +94,6 @@ public class OfferManagerImplTest extends EasyMockTest {
         clock.assertEmpty();
       }
     });
-    offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { });
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
       public Amount<Long, Time> get() {
@@ -109,11 +110,9 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer offerA = setMode(OFFER_A, DRAINING);
     HostOffer offerC = setMode(OFFER_C, DRAINING);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
+    driver.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
 
-    driver.declineOffer(offerA.getOffer().getId());
+    driver.declineOffer(OFFER_A_ID);
     driver.declineOffer(offerC.getOffer().getId());
 
     control.replay();
@@ -121,98 +120,165 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.addOffer(offerA);
     offerManager.addOffer(OFFER_B);
     offerManager.addOffer(offerC);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(
+        ImmutableSet.of(OFFER_B, offerA, offerC),
+        ImmutableSet.copyOf(offerManager.getOffers()));
+    offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testGetOffersReturnsAllOffers() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+  public void hostAttributeChangeUpdatesOfferSorting() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+    driver.declineOffer(OFFER_B.getOffer().getId());
 
     control.replay();
 
+    offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
+
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.addOffer(OFFER_B);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
 
-    offerManager.cancelOffer(OFFER_A.getOffer().getId());
-    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+    HostOffer offerA = setMode(OFFER_A, DRAINING);
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+    assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers()));
+
+    offerA = setMode(OFFER_A, NONE);
+    HostOffer offerB = setMode(OFFER_B, DRAINING);
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
+    offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testOfferFilteringDueToStaticBan() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))));
+  public void testAddSameSlaveOffer() {
+    driver.declineOffer(OFFER_A_ID);
+    expectLastCall().times(2);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_B.getOffer().getId(), task);
+    control.replay();
 
-    driver.declineOffer(OFFER_A.getOffer().getId());
+    offerManager.addOffer(OFFER_A);
+    offerManager.addOffer(OFFER_A);
+
+    clock.advance(RETURN_DELAY);
+  }
 
+  @Test
+  public void testGetOffersReturnsAllOffers() throws Exception {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
-    // Run again to make sure all offers are banned (via no expectations set).
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
 
-    // Add a new offer to accept the task previously banned for OFFER_A.
-    offerManager.addOffer(OFFER_B);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.cancelOffer(OFFER_A_ID);
+    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void testStaticBanIsCleared() throws Exception {
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100))));
+  public void testOfferFilteringDueToStaticBan() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
 
-    TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
+    control.replay();
 
-    expect(offerAcceptor.apply(OFFER_A))
-        .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining"))));
+    // Static ban ignored when now offers.
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    offerManager.addOffer(OFFER_A);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
 
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task));
-    driver.launchTask(OFFER_A.getOffer().getId(), task);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
 
-    driver.declineOffer(OFFER_A.getOffer().getId());
+    // Add static ban.
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsClearedOnOfferReturn() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+    expectLastCall().times(2);
 
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
 
     // Make sure the static ban is cleared when the offers are returned.
     clock.advance(RETURN_DELAY);
     offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test
+  public void testStaticBanIsClearedOnDriverDisconnect() throws Exception {
+    driver.declineOffer(OFFER_A_ID);
+
+    control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    offerManager.banOffer(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
 
     // Make sure the static ban is cleared when driver is disconnected.
     offerManager.driverDisconnected(new DriverDisconnected());
     offerManager.addOffer(OFFER_A);
-    assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
 
     clock.advance(RETURN_DELAY);
   }
 
   @Test
+  public void getOffer() {
+    driver.declineOffer(OFFER_A_ID);
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getSlaveId()));
+    clock.advance(RETURN_DELAY);
+  }
+
+  @Test(expected = OfferManager.LaunchException.class)
+  public void testLaunchTaskDriverThrows() throws OfferManager.LaunchException {
+    driver.launchTask(OFFER_A_ID, TASK_INFO);
+    expectLastCall().andThrow(new IllegalStateException());
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+
+    try {
+      offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+    } finally {
+      clock.advance(RETURN_DELAY);
+    }
+  }
+
+  @Test(expected = OfferManager.LaunchException.class)
+  public void testLaunchTaskOfferRaceThrows() throws OfferManager.LaunchException {
+    control.replay();
+    offerManager.launchTask(OFFER_A_ID, TASK_INFO);
+  }
+
+  @Test
   public void testFlushOffers() throws Exception {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     offerManager.addOffer(OFFER_B);
     offerManager.driverDisconnected(new DriverDisconnected());
-    assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY));
     clock.advance(RETURN_DELAY);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index a2e2d4c..350ec6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -24,10 +24,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
 
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -37,27 +34,19 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.Offers;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
@@ -67,10 +56,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -78,34 +65,22 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   private static final IScheduledTask TASK_A =
       TaskTestUtil.makeTask("a", JobKeys.from("a", "a", "a"));
-  private static final IScheduledTask TASK_B =
-      TaskTestUtil.makeTask("b", JobKeys.from("b", "b", "b"));
-  private static final HostOffer OFFER = new HostOffer(
-      Offers.makeOffer("OFFER_A", "HOST_A"),
-      IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
-
-  private static final String SLAVE_ID = OFFER.getOffer().getSlaveId().getValue();
-
-  private static final TaskGroupKey GROUP_A = TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
-  private static final TaskGroupKey GROUP_B = TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
+  private static final String SLAVE_ID = "HOST_A";
+  private static final Optional<String> NO_RESERVATION = Optional.absent();
 
   private StorageTestUtil storageUtil;
-  private StateManager stateManager;
   private TaskAssigner assigner;
-  private OfferManager offerManager;
   private TaskScheduler scheduler;
   private Preemptor preemptor;
-  private BiCache<String, TaskGroupKey> reservations;
+  private BiCache<TaskGroupKey, String> reservations;
   private EventSink eventSink;
 
   @Before
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
-    stateManager = createMock(StateManager.class);
     assigner = createMock(TaskAssigner.class);
-    offerManager = createMock(OfferManager.class);
     preemptor = createMock(Preemptor.class);
-    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
+    reservations = createMock(new Clazz<BiCache<TaskGroupKey, String>>() { });
 
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
@@ -118,11 +93,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
-            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations);
+            bind(new TypeLiteral<BiCache<TaskGroupKey, String>>() { }).toInstance(reservations);
             bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
             bind(Preemptor.class).toInstance(preemptor);
-            bind(OfferManager.class).toInstance(offerManager);
-            bind(StateManager.class).toInstance(stateManager);
             bind(TaskAssigner.class).toInstance(assigner);
             bind(Clock.class).toInstance(createMock(Clock.class));
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
@@ -138,89 +111,100 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         ImmutableSet.of(task));
   }
 
-  private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(
+  private IExpectationSetters<Boolean> expectAssigned(
+      IScheduledTask task,
+      Optional<String> reservation) {
+
+    return expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
-        OFFER,
         new ResourceRequest(task.getAssignedTask().getTask(), EMPTY),
-        Tasks.id(task))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+        TaskGroupKey.from(task.getAssignedTask().getTask()),
+        Tasks.id(task),
+        reservation));
+  }
+
+  @Test
+  public void testSchedule() throws Exception {
+    storageUtil.expectOperations();
+
+    expectReservationCheck(TASK_A);
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(true);
+
+    control.replay();
+
+    assertTrue(scheduler.schedule("a"));
+  }
+
+  @Test
+  public void testScheduleNoTask() throws Exception {
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(
+        Query.taskScoped(Tasks.id(TASK_A)).byStatus(PENDING),
+        ImmutableSet.of());
+
+    control.replay();
+
+    assertTrue(scheduler.schedule("a"));
   }
 
   @Test
   public void testReservation() throws Exception {
     storageUtil.expectOperations();
 
+    // No reservation available in preemptor
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
-    expectReservationCheck(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectReservationCheck(TASK_A).times(2);
+    expectPreemptorCall(TASK_A, NO_RESERVATION);
+
+    // Slave is reserved.
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectReservationCheck(TASK_A).times(2);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
+    expectAddReservation(TASK_A, SLAVE_ID);
 
     // Use previously created reservation.
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectAssigned(TASK_A);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
+    expectGetReservation(TASK_A, SLAVE_ID);
+    expectAssigned(TASK_A, Optional.of(SLAVE_ID)).andReturn(true);
 
     control.replay();
 
     assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule("a"));
     assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   @Test
-  public void testReservationExpires() throws Exception {
+  public void testReservationUnusable() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
     expectReservationCheck(TASK_A);
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-    expectAddReservation(SLAVE_ID, TASK_A);
-
-    // First attempt -> reservation is active.
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
-    expectGetReservation(SLAVE_ID, TASK_A);
-    expectReservationCheck(TASK_B);
-    expectPreemptorCall(TASK_B, Optional.absent());
-
-    // Status changed -> reservation removed.
-    reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
-
-    // Second attempt -> reservation expires.
-    expectGetNoReservation(SLAVE_ID);
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_B);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
     assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("b"));
-    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
-
-    eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING));
-    assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
   }
 
   @Test
-  public void testReservationUnusable() throws Exception {
+  public void testReservationRemoved() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
-    expectLaunchAttempt(false);
-    expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of(SLAVE_ID));
+    expectActiveJobFetch(TASK_A);
+    expectReservationCheck(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andReturn(false);
+    expectGetReservation(TASK_A, SLAVE_ID);
 
     control.replay();
 
@@ -236,17 +220,18 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
   @Test
   public void testPendingDeletedHandled() throws Exception {
+    reservations.remove(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()), SLAVE_ID);
+
     control.replay();
 
-    IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
-    eventSink.post(TaskStateChange.transition(task, PENDING));
+    ScheduledTask taskBuilder = TASK_A.newBuilder().setStatus(PENDING);
+    taskBuilder.getAssignedTask().setSlaveId(SLAVE_ID);
+    eventSink.post(TaskStateChange.transition(IScheduledTask.build(taskBuilder), PENDING));
   }
 
   @Test
   public void testIgnoresThrottledTasks() throws Exception {
-    // Ensures that tasks in THROTTLED state are not considered part of the active job state passed
-    // to the assigner function.
-
+    // Ensures that tasks in THROTTLED state are not considered part of the active job state.
     Storage memStorage = DbUtil.createStorage();
 
     Injector injector = getInjector(memStorage);
@@ -265,23 +250,31 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
-    expectGetNoReservation(SLAVE_ID);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
+    expectReservationCheck(TASK_A);
     expect(assigner.maybeAssign(
         EasyMock.anyObject(),
-        eq(OFFER),
         eq(new ResourceRequest(taskA.getAssignedTask().getTask(), EMPTY)),
-        eq(Tasks.id(taskA)))).andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+        eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
+        eq(Tasks.id(taskA)),
+        eq(NO_RESERVATION))).andReturn(true);
 
     control.replay();
 
     assertTrue(scheduler.schedule(Tasks.id(taskA)));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
-  private static class AssignmentCapture {
-    public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
-    public Capture<TaskGroupKey> groupKey = createCapture();
+  @Test
+  public void testScheduleThrows() throws Exception {
+    storageUtil.expectOperations();
+
+    expectReservationCheck(TASK_A);
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectAssigned(TASK_A, NO_RESERVATION).andThrow(new IllegalArgumentException("expected"));
+
+    control.replay();
+
+    assertFalse(scheduler.schedule("a"));
   }
 
   private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
@@ -291,31 +284,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         storageUtil.mutableStoreProvider)).andReturn(result);
   }
 
-  private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
-      throws OfferManager.LaunchException {
-
-    AssignmentCapture capture = new AssignmentCapture();
-    expect(offerManager.launchFirst(capture(capture.assigner), capture(capture.groupKey)))
-        .andReturn(taskLaunched);
-    return capture;
-  }
-
-  private IScheduledTask assign(IScheduledTask task, String slaveId) {
-    ScheduledTask result = task.newBuilder();
-    result.getAssignedTask().setSlaveId(slaveId);
-    return IScheduledTask.build(result);
-  }
-
-  private void assignAndAssert(
-      Result result,
-      TaskGroupKey groupKey,
-      HostOffer offer,
-      AssignmentCapture capture) {
-
-    assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
-    assertEquals(groupKey, capture.groupKey.getValue());
-  }
-
   private void expectActiveJobFetch(IScheduledTask task) {
     storageUtil.expectTaskFetch(
         Query.jobScoped(((Function<IScheduledTask, IJobKey>) Tasks::getJob).apply(task))
@@ -323,21 +291,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         ImmutableSet.of());
   }
 
-  private void expectAddReservation(String slaveId, IScheduledTask task) {
-    reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask()));
-  }
-
-  private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) {
-    return expect(reservations.get(slaveId))
-        .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
+  private void expectAddReservation(IScheduledTask task, String slaveId) {
+    reservations.put(TaskGroupKey.from(task.getAssignedTask().getTask()), slaveId);
   }
 
-  private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
-    return expect(reservations.get(slaveId)).andReturn(Optional.absent());
+  private IExpectationSetters<?> expectGetReservation(IScheduledTask task, String slaveId) {
+    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(Optional.of(slaveId));
   }
 
   private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
-    return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
-        .andReturn(ImmutableSet.of());
+    return expect(reservations.get(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(Optional.<String>absent());
   }
 }


Mime
View raw message