aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Offer filtering for static vetoes. Part 1 of 4: TaskAssigner.
Date Fri, 13 Feb 2015 00:42:52 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 253521699 -> f3473a3d9


Offer filtering for static vetoes. Part 1 of 4: TaskAssigner.

Bugs closed: AURORA-909

Reviewed at https://reviews.apache.org/r/30888/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/f3473a3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/f3473a3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/f3473a3d

Branch: refs/heads/master
Commit: f3473a3d99b7e6764e3dbaeaddaacd7d2a1cf188
Parents: 2535216
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Thu Feb 12 16:40:23 2015 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Thu Feb 12 16:40:23 2015 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/OfferQueue.java      |  61 +++++-----
 .../aurora/scheduler/async/TaskScheduler.java   |  16 +--
 .../aurora/scheduler/state/TaskAssigner.java    | 116 ++++++++++++++++++-
 .../scheduler/async/OfferQueueImplTest.java     |   8 +-
 .../scheduler/async/TaskSchedulerImplTest.java  |  60 ++++++----
 .../scheduler/async/TaskSchedulerTest.java      |  31 ++---
 .../scheduler/state/TaskAssignerImplTest.java   |   6 +-
 7 files changed, 211 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index f663838..332338b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -41,10 +41,10 @@ import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
@@ -75,7 +75,7 @@ public interface OfferQueue extends EventSubscriber {
   void cancelOffer(OfferID offer);
 
   /**
-   * Launches the first task that satisfies the {@code acceptor} by returning a {@link TaskInfo}.
+   * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}.
    *
    * @param acceptor Function that determines if an offer is accepted.
    * @return {@code true} if the task was launched, {@code false} if no offers satisfied
the
@@ -83,7 +83,7 @@ public interface OfferQueue extends EventSubscriber {
    * @throws LaunchException If the acceptor accepted an offer, but there was an error launching
the
    *                         task.
    */
-  boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor) throws
LaunchException;
+  boolean launchFirst(Function<HostOffer, Assignment> acceptor) throws LaunchException;
 
   /**
    * Notifies the offer queue that a host's attributes have changed.
@@ -290,7 +290,7 @@ public interface OfferQueue extends EventSubscriber {
 
     @Timed("offer_queue_launch_first")
     @Override
-    public boolean launchFirst(Function<HostOffer, Optional<TaskInfo>> acceptor)
+    public boolean launchFirst(Function<HostOffer, Assignment> acceptor)
         throws LaunchException {
 
       // It's important that this method is not called concurrently - doing so would open
up the
@@ -308,33 +308,36 @@ public interface OfferQueue extends EventSubscriber {
     @Timed("offer_queue_accept_offer")
     protected boolean acceptOffer(
         HostOffer offer,
-        Function<HostOffer, Optional<TaskInfo>> acceptor) throws LaunchException
{
-
-      Optional<TaskInfo> assignment = acceptor.apply(offer);
-      if (assignment.isPresent()) {
-        // Guard against an offer being removed after we grabbed it from the iterator.
-        // If that happens, the offer will not exist in hostOffers, and we can immediately
-        // send it back to LOST for quick reschedule.
-        // Removing while iterating counts on the use of a weakly-consistent iterator being
used,
-        // which is a feature of ConcurrentSkipListSet.
-        if (hostOffers.remove(offer.getOffer().getId())) {
-          try {
-            driver.launchTask(offer.getOffer().getId(), assignment.get());
-            return true;
-          } catch (IllegalStateException e) {
-            // TODO(William Farner): Catch only the checked exception produced by Driver
-            // once it changes from throwing IllegalStateException when the driver is not
yet
-            // registered.
-            throw new LaunchException("Failed to launch task.", e);
+        Function<HostOffer, Assignment> acceptor) throws LaunchException {
+
+      Assignment assignment = acceptor.apply(offer);
+      switch (assignment.getResult()) {
+
+        case SUCCESS:
+          // Guard against an offer being removed after we grabbed it from the iterator.
+          // If that happens, the offer will not exist in hostOffers, and we can immediately
+          // send it back to LOST for quick reschedule.
+          // Removing while iterating counts on the use of a weakly-consistent iterator being
used,
+          // which is a feature of ConcurrentSkipListSet.
+          if (hostOffers.remove(offer.getOffer().getId())) {
+            try {
+              driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get());
+              return true;
+            } catch (IllegalStateException e) {
+              // TODO(William Farner): Catch only the checked exception produced by Driver
+              // once it changes from throwing IllegalStateException when the driver is not
yet
+              // registered.
+              throw new LaunchException("Failed to launch task.", e);
+            }
+          } else {
+            offerRaces.incrementAndGet();
+            throw new LaunchException(
+                "Accepted offer no longer exists in offer queue, likely data race.");
           }
-        } else {
-          offerRaces.incrementAndGet();
-          throw new LaunchException(
-              "Accepted offer no longer exists in offer queue, likely data race.");
-        }
+        case FAILURE:
+        default:
+          return false;
       }
-
-      return false;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index ce47ff1..ced3bde 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -51,6 +51,7 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -59,7 +60,6 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -131,16 +131,16 @@ public interface TaskScheduler extends EventSubscriber {
       this.reservations = new Reservations(statsProvider, reservationDuration, clock);
     }
 
-    private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
+    private Function<HostOffer, Assignment> getAssignerFunction(
         final MutableStoreProvider storeProvider,
         final ResourceRequest resourceRequest) {
 
       // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the
first match
       // and perform the assignment at the very end.  This will allow us to use optimistic
locking
       // at the top of the stack and avoid holding the write lock for too long.
-      return new Function<HostOffer, Optional<TaskInfo>>() {
+      return new Function<HostOffer, Assignment>() {
         @Override
-        public Optional<TaskInfo> apply(HostOffer offer) {
+        public Assignment apply(HostOffer offer) {
           Optional<String> reservedTaskId =
               reservations.getSlaveReservation(offer.getOffer().getSlaveId());
           if (reservedTaskId.isPresent()) {
@@ -149,7 +149,7 @@ public interface TaskScheduler extends EventSubscriber {
               return assigner.maybeAssign(storeProvider, offer, resourceRequest);
             } else {
               // Slave is reserved for another task.
-              return Optional.absent();
+              return Assignment.failure();
             }
           } else {
             // Slave is not reserved.
@@ -215,8 +215,10 @@ public interface TaskScheduler extends EventSubscriber {
       } else {
         AttributeAggregate aggregate = getJobState(store, task.getJob());
         try {
-          ResourceRequest resourceRequest = new ResourceRequest(task, taskId, aggregate);
-          if (!offerQueue.launchFirst(getAssignerFunction(store, resourceRequest))) {
+          boolean launched = offerQueue.launchFirst(
+              getAssignerFunction(store, new ResourceRequest(task, taskId, aggregate)));
+
+          if (!launched) {
             // Task could not be scheduled.
             maybePreemptFor(taskId, aggregate);
             attemptsNoMatch.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index e1c2974..5a0f7dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -18,7 +18,10 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.base.MorePreconditions;
 
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.ResourceSlot;
@@ -41,16 +44,117 @@ import static org.apache.mesos.Protos.Offer;
  */
 public interface TaskAssigner {
 
+  final class Assignment {
+
+    public enum Result {
+      /**
+       * Assignment successful.
+       */
+      SUCCESS,
+
+      /**
+       * Assignment failed.
+       */
+      FAILURE
+    }
+
+    private static final Optional<TaskInfo> NO_TASK_INFO = Optional.absent();
+    private static final ImmutableSet<Veto> NO_VETOES = ImmutableSet.of();
+    private final Optional<TaskInfo> taskInfo;
+    private final Set<Veto> vetoes;
+
+    private Assignment(Optional<TaskInfo> taskInfo, Set<Veto> vetoes) {
+      this.taskInfo = taskInfo;
+      this.vetoes = vetoes;
+    }
+
+    /**
+     * Creates a successful assignment instance.
+     *
+     * @param taskInfo {@link TaskInfo} to launch.
+     * @return A successful {@link Assignment}.
+     */
+    public static Assignment success(TaskInfo taskInfo) {
+      return new Assignment(Optional.of(taskInfo), NO_VETOES);
+    }
+
+    /**
+     * Creates a failed assignment instance with a set of {@link Veto} applied.
+     *
+     * @param vetoes Set of {@link Veto} instances issued for the failed offer/task match.
+     * @return A failed {@link Assignment}.
+     */
+    public static Assignment failure(Set<Veto> vetoes) {
+      return new Assignment(NO_TASK_INFO, MorePreconditions.checkNotBlank(vetoes));
+    }
+
+    /**
+     * Creates a failed assignment instance.
+     *
+     * @return A failed {@link Assignment}.
+     */
+    public static Assignment failure() {
+      return new Assignment(NO_TASK_INFO, NO_VETOES);
+    }
+
+    /**
+     * Generates the {@link Result} based on the assignment details.
+     *
+     * @return An assignment {@link Result}.
+     */
+    public Result getResult() {
+      if (taskInfo.isPresent()) {
+        return Result.SUCCESS;
+      }
+
+      return Result.FAILURE;
+    }
+
+    /**
+     * A {@link TaskInfo} to launch.
+     *
+     * @return Optional of {@link TaskInfo}.
+     */
+    public Optional<TaskInfo> getTaskInfo() {
+      return taskInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Assignment)) {
+        return false;
+      }
+
+      Assignment other = (Assignment) o;
+
+      return Objects.equal(taskInfo, other.taskInfo)
+          && Objects.equal(vetoes, other.vetoes);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(taskInfo, vetoes);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("taskInfo", taskInfo)
+          .add("vetoes", vetoes)
+          .toString();
+    }
+  }
+
   /**
    * Tries to match a task against an offer.  If a match is found, the assigner should
-   * make the appropriate changes to the task and provide a non-empty result.
+   * make the appropriate changes to the task and provide an {@link Assignment} result.
    *
    * @param storeProvider Storage provider.
    * @param offer The resource offer.
    * @param resourceRequest The request for resources being scheduled.
-   * @return Instructions for launching the task if matching and assignment were successful.
+   * @return {@link Assignment} with assignment result.
    */
-  Optional<TaskInfo> maybeAssign(
+  Assignment maybeAssign(
       MutableStoreProvider storeProvider,
       HostOffer offer,
       ResourceRequest resourceRequest);
@@ -93,7 +197,7 @@ public interface TaskAssigner {
     }
 
     @Override
-    public Optional<TaskInfo> maybeAssign(
+    public Assignment maybeAssign(
         MutableStoreProvider storeProvider,
         HostOffer offer,
         ResourceRequest resourceRequest) {
@@ -102,7 +206,7 @@ public interface TaskAssigner {
           new UnusedResource(ResourceSlot.from(offer.getOffer()), offer.getAttributes()),
           resourceRequest);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(
+        return Assignment.success(assign(
             storeProvider,
             offer.getOffer(),
             resourceRequest.getNumRequestedPorts(),
@@ -110,7 +214,7 @@ public interface TaskAssigner {
       } else {
         LOG.fine("Slave " + offer.getOffer().getHostname()
             + " vetoed task " + resourceRequest.getTaskId() + ": " + vetoes);
-        return Optional.absent();
+        return Assignment.failure(vetoes);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index 4cf602a..2b5dc49 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async;
 import java.util.concurrent.ScheduledExecutorService;
 
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.common.testing.TearDown;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
@@ -29,6 +28,7 @@ import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
 import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.mesos.Protos.TaskInfo;
@@ -59,7 +59,7 @@ public class OfferQueueImplTest extends EasyMockTest {
 
   private Driver driver;
   private FakeScheduledExecutor clock;
-  private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
+  private Function<HostOffer, Assignment> offerAcceptor;
   private OfferQueueImpl offerQueue;
 
   @Before
@@ -74,7 +74,7 @@ public class OfferQueueImplTest extends EasyMockTest {
         clock.assertEmpty();
       }
     });
-    offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>()
{ });
+    offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() {
});
     OfferReturnDelay returnDelay = new OfferReturnDelay() {
       @Override
       public Amount<Long, Time> get() {
@@ -92,7 +92,7 @@ public class OfferQueueImplTest extends EasyMockTest {
     HostOffer offerC = setMode(OFFER_C, DRAINING);
 
     TaskInfo task = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.of(task));
+    expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task));
     driver.launchTask(OFFER_B.getOffer().getId(), task);
 
     driver.declineOffer(offerA.getOffer().getId());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 5647349..d0e1193 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -48,6 +48,8 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -140,7 +142,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         storageUtil.mutableStoreProvider,
         OFFER,
         new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob)))
-        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
+        .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
   }
 
   @Test
@@ -156,12 +158,12 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment =
expectLaunchAttempt(false);
+    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
     expect(preemptor.findPreemptionSlotFor("b", emptyJob)).andReturn(Optional.<String>absent());
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment =
expectLaunchAttempt(true);
+    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_B);
 
     control.replay();
@@ -169,13 +171,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     assertFalse(scheduler.schedule("a"));
     assertFalse(scheduler.schedule("b"));
 
-    assertEquals(Optional.<TaskInfo>absent(), firstAssignment.getValue().apply(OFFER));
+    assignAndAssert(Result.FAILURE, OFFER, firstAssignment);
 
     clock.advance(reservationDuration);
 
     assertTrue(scheduler.schedule("b"));
 
-    assertEquals(true, secondAssignment.getValue().apply(OFFER).isPresent());
+    assignAndAssert(Result.SUCCESS, OFFER, secondAssignment);
   }
 
   @Test
@@ -191,28 +193,28 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment =
expectLaunchAttempt(true);
+    AssignmentCapture firstAssignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
 
-    Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment =
expectLaunchAttempt(true);
+    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
 
     expect(assigner.maybeAssign(
         storageUtil.mutableStoreProvider,
         OFFER,
         new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob)))
-        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
+        .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
 
     control.replay();
     assertFalse(scheduler.schedule("a"));
     assertTrue(scheduler.schedule("a"));
-    firstAssignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, firstAssignment);
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     clock.advance(halfReservationDuration);
     assertTrue(scheduler.schedule("b"));
-    secondAssignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, secondAssignment);
   }
 
   @Test
@@ -227,7 +229,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment =
expectLaunchAttempt(true);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     control.replay();
@@ -235,7 +237,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     clock.advance(halfReservationDuration);
     assertTrue(scheduler.schedule("a"));
 
-    firstAssignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, assignment);
   }
 
   @Test
@@ -252,7 +254,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
     expectAssigned(TASK_B);
 
     control.replay();
@@ -261,7 +263,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // Task is killed by user before it is scheduled
     eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
     assertTrue(scheduler.schedule("b"));
-    assignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, assignment);
   }
 
   @Test
@@ -277,7 +279,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
-    Capture<Function<HostOffer, Optional<TaskInfo>>> firstAssignment =
expectLaunchAttempt(true);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
     expectAssigned(TASK_A);
 
     control.replay();
@@ -285,7 +287,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     // We don't act on the reservation made by b because we want to see timeout behaviour.
     clock.advance(reservationDuration);
     assertTrue(scheduler.schedule("a"));
-    firstAssignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, assignment);
   }
 
   @Test
@@ -311,17 +313,17 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
-    Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
     expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
         eq(OFFER),
         eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob))))
-        .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
+        .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
 
     control.replay();
 
     assertTrue(scheduler.schedule(Tasks.id(taskA)));
-    assignment.getValue().apply(OFFER);
+    assignAndAssert(Result.SUCCESS, OFFER, assignment);
   }
 
   private static IScheduledTask makeTask(String taskId) {
@@ -336,11 +338,23 @@ public class TaskSchedulerImplTest extends EasyMockTest {
                 .setEnvironment("env-" + taskId))));
   }
 
-  private Capture<Function<HostOffer, Optional<TaskInfo>>> expectLaunchAttempt(boolean
taskLaunched)
+  private static class AssignmentCapture {
+    public Capture<Function<HostOffer, Assignment>> assigner = createCapture();
+  }
+
+  private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
       throws OfferQueue.LaunchException {
-        Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = createCapture();
-        expect(offerQueue.launchFirst(capture(assignment))).andReturn(taskLaunched);
-        return assignment;
+        AssignmentCapture capture = new AssignmentCapture();
+        expect(offerQueue.launchFirst(capture(capture.assigner))).andReturn(taskLaunched);
+        return capture;
+  }
+
+  private void assignAndAssert(
+      Result result,
+      HostOffer offer,
+      AssignmentCapture capture) {
+
+    assertEquals(result, capture.assigner.getValue().apply(offer).getResult());
   }
 
   private void expectActiveJobFetch(IScheduledTask taskInJob) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 6cc1323..74e3133 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -57,6 +57,7 @@ import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -292,7 +293,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     timeoutCapture.getValue().run();
   }
 
-  private IExpectationSetters<Optional<TaskInfo>> expectMaybeAssign(
+  private IExpectationSetters<Assignment> expectMaybeAssign(
       HostOffer offer,
       IScheduledTask task,
       AttributeAggregate jobAggregate) {
@@ -312,11 +313,11 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
 
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
     expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS,
10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
 
     Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -347,7 +348,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
     expect(stateManager.changeState(
@@ -380,7 +381,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectMaybeAssign(OFFER_A, task, emptyJob).andThrow(new StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS,
10);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.of(mesosTask));
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall();
 
@@ -399,7 +400,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
     expectAnyMaintenanceCalls();
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
     Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS,
10);
     expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
     driver.declineOffer(OFFER_A.getOffer().getId());
@@ -439,10 +440,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectAnyMaintenanceCalls();
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
 
-    Function<HostOffer, Optional<TaskInfo>> offerAcceptor =
-        createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>()
{ });
+    Function<HostOffer, Assignment> offerAcceptor =
+        createMock(new Clazz<Function<HostOffer, Assignment>>() { });
     final TaskInfo taskInfo = TaskInfo.getDefaultInstance();
-    expect(offerAcceptor.apply(OFFER_A)).andReturn(Optional.of(taskInfo));
+    expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo));
     driver.launchTask(OFFER_A.getOffer().getId(), taskInfo);
 
     replayAndCreateScheduler();
@@ -461,13 +462,13 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Optional.of(mesosTaskA));
+    expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
-    expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Optional.of(mesosTaskB));
+    expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -493,7 +494,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
-    expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Optional.of(mesosTaskA));
+    expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -502,7 +503,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     HostOffer updatedOfferC = new HostOffer(
         OFFER_C.getOffer(),
         IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
-    expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Optional.of(mesosTaskB));
+    expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
@@ -532,7 +533,7 @@ public class TaskSchedulerTest extends EasyMockTest {
         EasyMock.<MutableStoreProvider>anyObject(),
         EasyMock.<HostOffer>anyObject(),
         capture(request)))
-        .andReturn(Optional.of(mesosTask));
+        .andReturn(Assignment.success(mesosTask));
     driver.launchTask(EasyMock.<OfferID>anyObject(), eq(mesosTask));
     return request;
   }
@@ -599,7 +600,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     final IScheduledTask task = makeTask("a", PENDING);
 
     Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
-    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Optional.<TaskInfo>absent());
+    expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
     expect(preemptor.findPreemptionSlotFor("a", emptyJob)).andReturn(Optional.<String>absent());
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/f3473a3d/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 411a55a..06a1903 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -32,6 +31,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -120,7 +120,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        Optional.of(TASK_INFO),
+        Assignment.success(TASK_INFO),
         assigner.maybeAssign(
             storeProvider,
             OFFER,
@@ -137,7 +137,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     control.replay();
 
     assertEquals(
-        Optional.<TaskInfo>absent(),
+        Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))),
         assigner.maybeAssign(
             storeProvider,
             OFFER,


Mime
View raw message