aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dmclaugh...@apache.org
Subject aurora git commit: Add best-effort update affinity into the Scheduler.
Date Sat, 06 May 2017 02:12:15 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 8a6e01c34 -> 85fed6ba9


Add best-effort update affinity into the Scheduler.

Reviewed at https://reviews.apache.org/r/58259/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/85fed6ba
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/85fed6ba
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/85fed6ba

Branch: refs/heads/master
Commit: 85fed6ba96d9e33ee819e05b700ffc3598b9a7d2
Parents: 8a6e01c
Author: David McLaughlin <david@dmclaughlin.com>
Authored: Fri May 5 19:11:57 2017 -0700
Committer: David McLaughlin <dmclaughlin@twitter.com>
Committed: Fri May 5 19:11:57 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   4 +
 docs/operations/configuration.md                |  35 +++
 .../aurora/scheduler/base/TaskTestUtil.java     |  37 +++-
 .../aurora/scheduler/resources/ResourceBag.java |  11 +
 .../scheduler/scheduling/TaskScheduler.java     |   2 +-
 .../aurora/scheduler/state/TaskAssigner.java    | 217 ++++++++++++++-----
 .../scheduler/updater/InstanceAction.java       |   3 +-
 .../updater/InstanceActionHandler.java          |  13 ++
 .../scheduler/updater/InstanceUpdater.java      |  15 +-
 .../updater/JobUpdateControllerImpl.java        |   4 +
 .../scheduler/updater/StateEvaluator.java       |   2 +
 .../scheduler/updater/UpdateAgentReserver.java  | 149 +++++++++++++
 .../aurora/scheduler/updater/UpdaterModule.java |  35 ++-
 .../scheduler/resources/ResourceBagTest.java    |  15 ++
 .../scheduling/TaskSchedulerImplTest.java       |   4 +-
 .../scheduler/state/TaskAssignerImplTest.java   | 144 +++++++++++-
 .../aurora/scheduler/updater/AddTaskTest.java   |   5 +
 .../scheduler/updater/InstanceUpdaterTest.java  |  32 ++-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   2 +-
 .../aurora/scheduler/updater/KillTaskTest.java  |  37 +++-
 .../updater/NullAgentReserverTest.java          |  38 ++++
 .../updater/UpdateAgentReserverImplTest.java    | 105 +++++++++
 22 files changed, 825 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 1ee0d01..4e930fb 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -17,6 +17,10 @@
   `V0_MESOS` drivers.
 - Add observer command line options to control the resource collection interval
   for observed tasks. See [here](docs/reference/observer-configuration.md) for details.
+- Added support for reserving agents during job updates, which can substantially reduce update times
+  in clusters with high contention for resources. Disabled by default, but can be enabled with
+  enable_update_affinity option, and the reservation timeout can be controlled via
+  update_affinity_reservation_hold_time.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/docs/operations/configuration.md
----------------------------------------------------------------------
diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md
index f0581ea..85a6fab 100644
--- a/docs/operations/configuration.md
+++ b/docs/operations/configuration.md
@@ -302,3 +302,38 @@ Increasing executor overhead on an existing cluster, whether it be for custom ex
 will result in degraded preemption performance until all task which began life with the previous
 executor configuration with less overhead are preempted/restarted.
 
+## Controlling MTTA via Update Affinity
+
+When there is high resource contention in your cluster you may experience noticably elevated job update
+times, as well as high task churn across the cluster. This is due to Aurora's first-fit scheduling
+algorithm. To alleviate this, you can enable update affinity where the Scheduler will make a best-effort
+attempt to reuse the same agent for the updated task (so long as the resources for the job are not being
+increased).
+
+To enable this in the Scheduler, you can set the following options:
+
+    --enable_update_affinity=true
+    --update_affinity_reservation_hold_time=3mins
+
+You will need to tune the hold time to match the behavior you see in your cluster. If you have extremely
+high update throughput, you might have to extend it as processing updates could easily add significant
+delays between scheduling attempts. You may also have to tune scheduling parameters to achieve the
+throughput you need in your cluster. Some relevant settings (with defaults) are:
+
+    --max_schedule_attempts_per_sec=40
+    --initial_schedule_penalty=1secs
+    --max_schedule_penalty=1mins
+    --scheduling_max_batch_size=3
+    --max_tasks_per_schedule_attempt=5
+
+There are metrics exposed by the Scheduler which can provide guidance on where the bottleneck is.
+Example metrics to look at:
+
+    - schedule_attempts_blocks (if this number is greater than 0, then task throughput is hitting
+                                limits controlled by --max_scheduler_attempts_per_sec)
+    - scheduled_task_penalty_* (metrics around scheduling penalties for tasks, if the numbers here are high
+                                then you could have high contention for resources)
+
+Most likely you'll run into limits with the number of update instances that can be processed per minute
+before you run into any other limits. So if your total work done per minute starts to exceed 2k instances,
+you may need to extend the update_affinity_reservation_hold_time.

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index f0b148c..186fa1b 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.base;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
@@ -155,7 +156,37 @@ public final class TaskTestUtil {
     return makeTask(id, makeConfig(job));
   }
 
+  public static IScheduledTask makeTask(String id, IJobKey job, int instanceId) {
+    return makeTask(id, makeConfig(job), instanceId, Optional.absent());
+  }
+
+  public static IScheduledTask makeTask(String id, IJobKey job, int instanceId, String agentId) {
+    return makeTask(id, makeConfig(job), instanceId, Optional.of(agentId));
+  }
+
   public static IScheduledTask makeTask(String id, ITaskConfig config) {
+    return makeTask(id, config, 2);
+  }
+
+  public static IScheduledTask makeTask(String id, ITaskConfig config, int instanceId) {
+    return makeTask(id, config, instanceId, Optional.absent());
+  }
+
+  public static IScheduledTask makeTask(
+      String id,
+      ITaskConfig config,
+      int instanceId,
+      Optional<String> agentId) {
+
+    AssignedTask assignedTask = new AssignedTask()
+        .setInstanceId(instanceId)
+        .setTaskId(id)
+        .setAssignedPorts(ImmutableMap.of("http", 1000))
+        .setTask(config.newBuilder());
+    if (agentId.isPresent()) {
+      assignedTask.setSlaveId(agentId.get());
+    }
+
     return IScheduledTask.build(new ScheduledTask()
         .setStatus(ScheduleStatus.ASSIGNED)
         .setTaskEvents(ImmutableList.of(
@@ -167,11 +198,7 @@ public final class TaskTestUtil {
                 .setScheduler("scheduler2")))
         .setAncestorId("ancestor")
         .setFailureCount(3)
-        .setAssignedTask(new AssignedTask()
-            .setInstanceId(2)
-            .setTaskId(id)
-            .setAssignedPorts(ImmutableMap.of("http", 1000))
-            .setTask(config.newBuilder())));
+        .setAssignedTask(assignedTask));
   }
 
   public static IScheduledTask addStateTransition(

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
index 211f1fc..d5db81b 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java
@@ -178,6 +178,17 @@ public class ResourceBag {
   }
 
   /**
+   * Verifies whether another bag would be able to fit into this one.
+   *
+   * @param other Other bag to try and fit.
+   * @return Whether or not the bag fits.
+   */
+  public boolean greaterThanOrEqualTo(ResourceBag other) {
+    // Subtract the subject and check if any of the resources have a negative value.
+    return subtract(other).filter(IS_NEGATIVE).getResourceVectors().isEmpty();
+  }
+
+  /**
    * Applies {@code operator} to this and {@code other} resource values. Any missing resource type
    * values on either side will get substituted with 0.0 before applying the operator.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 203f62b..0002b0c 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -180,7 +180,7 @@ public interface TaskScheduler extends EventSubscriber {
               task,
               bagFromResources(task.getResources()).add(overhead), aggregate),
           TaskGroupKey.from(task),
-          assignableTaskMap.keySet(),
+          assignedTasks,
           reservations.asMap());
 
       attemptsFired.addAndGet(assignableTaskMap.size());

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index a177b30..7c531af 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -30,6 +30,7 @@ import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TierInfo;
 import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
@@ -41,6 +42,9 @@ import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.resources.ResourceManager;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos;
 import org.apache.mesos.v1.Protos.TaskInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,16 +67,16 @@ public interface TaskAssigner {
    * @param storeProvider Storage provider.
    * @param resourceRequest The request for resources being scheduled.
    * @param groupKey Task group key.
-   * @param taskIds Task IDs to assign.
-   * @param slaveReservations Slave reservations.
+   * @param tasks Tasks to assign.
+   * @param preemptionReservations Slave reservations.
    * @return Successfully assigned task IDs.
    */
   Set<String> maybeAssign(
       MutableStoreProvider storeProvider,
       ResourceRequest resourceRequest,
       TaskGroupKey groupKey,
-      Iterable<String> taskIds,
-      Map<String, TaskGroupKey> slaveReservations);
+      Iterable<IAssignedTask> tasks,
+      Map<String, TaskGroupKey> preemptionReservations);
 
   class TaskAssignerImpl implements TaskAssigner {
     private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class);
@@ -93,6 +97,7 @@ public interface TaskAssigner {
     private final MesosTaskFactory taskFactory;
     private final OfferManager offerManager;
     private final TierManager tierManager;
+    private final UpdateAgentReserver updateAgentReserver;
 
     @Inject
     public TaskAssignerImpl(
@@ -101,6 +106,7 @@ public interface TaskAssigner {
         MesosTaskFactory taskFactory,
         OfferManager offerManager,
         TierManager tierManager,
+        UpdateAgentReserver updateAgentReserver,
         StatsProvider statsProvider) {
 
       this.stateManager = requireNonNull(stateManager);
@@ -110,6 +116,7 @@ public interface TaskAssigner {
       this.tierManager = requireNonNull(tierManager);
       this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
       this.evaluatedOffers = statsProvider.makeCounter(ASSIGNER_EVALUATED_OFFERS);
+      this.updateAgentReserver = requireNonNull(updateAgentReserver);
     }
 
     @VisibleForTesting
@@ -141,82 +148,180 @@ public interface TaskAssigner {
       return taskFactory.createFrom(assigned, offer);
     }
 
+    private boolean evaluateOffer(
+        MutableStoreProvider storeProvider,
+        TierInfo tierInfo,
+        ResourceRequest resourceRequest,
+        TaskGroupKey groupKey,
+        IAssignedTask task,
+        HostOffer offer,
+        ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+
+      String taskId = task.getTaskId();
+      Set<Veto> vetoes = filter.filter(
+          new UnusedResource(
+              offer.getResourceBag(tierInfo),
+              offer.getAttributes(),
+              offer.getUnavailabilityStart()),
+          resourceRequest);
+
+      if (vetoes.isEmpty()) {
+        TaskInfo taskInfo = assign(
+            storeProvider,
+            offer.getOffer(),
+            taskId);
+        resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+
+        try {
+          offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+          assignmentResult.add(taskId);
+          return true;
+        } catch (OfferManager.LaunchException e) {
+          LOG.warn("Failed to launch task.", e);
+          launchFailures.incrementAndGet();
+
+          // The attempt to schedule the task failed, so we need to backpedal on the
+          // assignment.
+          // It is in the LOST state and a new task will move to PENDING to replace it.
+          // Should the state change fail due to storage issues, that's okay.  The task will
+          // time out in the ASSIGNED state and be moved to LOST.
+          stateManager.changeState(
+              storeProvider,
+              taskId,
+              Optional.of(PENDING),
+              LOST,
+              LAUNCH_FAILED_MSG);
+          throw e;
+        }
+      } else {
+        if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
+          // Never attempt to match this offer/groupKey pair again.
+          offerManager.banOffer(offer.getOffer().getId(), groupKey);
+        }
+        LOG.debug("Agent {} vetoed task {}: {}", offer.getOffer().getHostname(), taskId, vetoes);
+      }
+      return false;
+    }
+
+    private Iterable<IAssignedTask> maybeAssignReserved(
+        Iterable<IAssignedTask> tasks,
+        MutableStoreProvider storeProvider,
+        TierInfo tierInfo,
+        ResourceRequest resourceRequest,
+        TaskGroupKey groupKey,
+        ImmutableSet.Builder<String> assignmentResult) {
+
+      if (!updateAgentReserver.hasReservations(groupKey)) {
+        return tasks;
+      }
+
+      // Data structure to record which tasks should be excluded from the regular (non-reserved)
+      // scheduling loop. This is important because we release reservations once they are used,
+      // so we need to record them separately to avoid them being double-scheduled.
+      ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
+
+      for (IAssignedTask task: tasks) {
+        IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+        Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
+        if (maybeAgentId.isPresent()) {
+          excludeBuilder.add(key);
+          Optional<HostOffer> offer = offerManager.getOffer(
+              Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build());
+          if (offer.isPresent()) {
+            try {
+              // The offer can still be veto'd because of changed constraints, or because the
+              // Scheduler hasn't been updated by Mesos yet...
+              if (evaluateOffer(
+                  storeProvider,
+                  tierInfo,
+                  resourceRequest,
+                  groupKey,
+                  task,
+                  offer.get(),
+                  assignmentResult)) {
+
+                LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
+                updateAgentReserver.release(maybeAgentId.get(), key);
+              } else {
+                LOG.info(
+                    "Tried to reuse offer on {} for {}, but was not ready yet.",
+                    maybeAgentId.get(),
+                    key);
+              }
+            } catch (OfferManager.LaunchException e) {
+              updateAgentReserver.release(maybeAgentId.get(), key);
+            }
+          }
+        }
+      }
+
+      // Return only the tasks that didn't have reservations. Offers on agents that were reserved
+      // might not have been seen by Aurora yet, so we need to wait until the reservation expires
+      // before giving up and falling back to the first-fit algorithm.
+      Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
+      return Iterables.filter(tasks, t -> !toBeExcluded.contains(
+          InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+    }
+
     @Timed("assigner_maybe_assign")
     @Override
     public Set<String> maybeAssign(
         MutableStoreProvider storeProvider,
         ResourceRequest resourceRequest,
         TaskGroupKey groupKey,
-        Iterable<String> taskIds,
-        Map<String, TaskGroupKey> slaveReservations) {
+        Iterable<IAssignedTask> tasks,
+        Map<String, TaskGroupKey> preemptionReservations) {
 
-      if (Iterables.isEmpty(taskIds)) {
+      if (Iterables.isEmpty(tasks)) {
         return ImmutableSet.of();
       }
 
       TierInfo tierInfo = tierManager.getTier(groupKey.getTask());
       ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
-      Iterator<String> remainingTasks = taskIds.iterator();
-      String taskId = remainingTasks.next();
 
-      for (HostOffer offer : offerManager.getOffers(groupKey)) {
-        evaluatedOffers.incrementAndGet();
+      Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
+          tasks,
+          storeProvider,
+          tierInfo,
+          resourceRequest,
+          groupKey,
+          assignmentResult);
 
-        Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
-            slaveReservations.get(offer.getOffer().getAgentId().getValue()));
+      Iterator<IAssignedTask> remainingTasks = nonReservedTasks.iterator();
+      // Make sure we still have tasks to process after reservations are processed.
+      if (remainingTasks.hasNext()) {
+        IAssignedTask task = remainingTasks.next();
+        for (HostOffer offer : offerManager.getOffers(groupKey)) {
+          evaluatedOffers.incrementAndGet();
 
-        if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) {
-          // This slave is reserved for a different task group -> skip.
-          continue;
-        }
+          String agentId = offer.getOffer().getAgentId().getValue();
 
-        Set<Veto> vetoes = filter.filter(
-            new UnusedResource(
-                offer.getResourceBag(tierInfo),
-                offer.getAttributes(),
-                offer.getUnavailabilityStart()),
-            resourceRequest);
+          Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
+              preemptionReservations.get(agentId));
 
-        if (vetoes.isEmpty()) {
-          TaskInfo taskInfo = assign(
-              storeProvider,
-              offer.getOffer(),
-              taskId);
+          if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) {
+            // This slave is reserved for a different task group -> skip.
+            continue;
+          }
 
-          resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+          if (!updateAgentReserver.getReservations(agentId).isEmpty()) {
+            // This agent has been reserved for an update in-progress, skip.
+            continue;
+          }
 
           try {
-            offerManager.launchTask(offer.getOffer().getId(), taskInfo);
-            assignmentResult.add(taskId);
-
-            if (remainingTasks.hasNext()) {
-              taskId = remainingTasks.next();
-            } else {
-              break;
+            boolean offerUsed = evaluateOffer(
+                storeProvider, tierInfo, resourceRequest, groupKey, task, offer, assignmentResult);
+            if (offerUsed) {
+              if (remainingTasks.hasNext()) {
+                task = remainingTasks.next();
+              } else {
+                break;
+              }
             }
           } catch (OfferManager.LaunchException e) {
-            LOG.warn("Failed to launch task.", e);
-            launchFailures.incrementAndGet();
-
-            // The attempt to schedule the task failed, so we need to backpedal on the
-            // assignment.
-            // It is in the LOST state and a new task will move to PENDING to replace it.
-            // Should the state change fail due to storage issues, that's okay.  The task will
-            // time out in the ASSIGNED state and be moved to LOST.
-            stateManager.changeState(
-                storeProvider,
-                taskId,
-                Optional.of(PENDING),
-                LOST,
-                LAUNCH_FAILED_MSG);
             break;
           }
-        } else {
-          if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
-            // Never attempt to match this offer/groupKey pair again.
-            offerManager.banOffer(offer.getOffer().getId(), groupKey);
-          }
-          LOG.debug("Agent {} vetoed task {}: {}", offer.getOffer().getHostname(), taskId, vetoes);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
index b4cd01b..9210757 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
@@ -20,7 +20,8 @@ import static org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask
 import static org.apache.aurora.scheduler.updater.InstanceActionHandler.WatchRunningTask;
 
 enum InstanceAction {
-  KILL_TASK(Optional.of(new KillTask())),
+  KILL_TASK_AND_RESERVE(Optional.of(new KillTask(true))),
+  KILL_TASK(Optional.of(new KillTask(false))),
   // TODO(wfarner): Build this action into the scheduler state machine instead.  Rather than
   // killing a task and re-recreating it, choose the updated or rolled-back task when we are
   // deciding to reschedule the task.

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
index f25dc0c..a39e871 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -45,6 +45,7 @@ interface InstanceActionHandler {
       IJobUpdateInstructions instructions,
       MutableStoreProvider storeProvider,
       StateManager stateManager,
+      UpdateAgentReserver reserver,
       JobUpdateStatus status,
       IJobUpdateKey key);
 
@@ -86,6 +87,7 @@ interface InstanceActionHandler {
         IJobUpdateInstructions instructions,
         MutableStoreProvider storeProvider,
         StateManager stateManager,
+        UpdateAgentReserver reserver,
         JobUpdateStatus status,
         IJobUpdateKey key) {
 
@@ -111,12 +113,19 @@ interface InstanceActionHandler {
   }
 
   class KillTask implements InstanceActionHandler {
+    private final boolean reserveForReplacement;
+
+    KillTask(boolean reserveForReplacement) {
+      this.reserveForReplacement = reserveForReplacement;
+    }
+
     @Override
     public Optional<Amount<Long, Time>> getReevaluationDelay(
         IInstanceKey instance,
         IJobUpdateInstructions instructions,
         MutableStoreProvider storeProvider,
         StateManager stateManager,
+        UpdateAgentReserver reserver,
         JobUpdateStatus status,
         IJobUpdateKey key) {
 
@@ -129,6 +138,9 @@ interface InstanceActionHandler {
             Optional.absent(),
             ScheduleStatus.KILLING,
             Optional.of("Killed for job update " + key.getId()));
+        if (reserveForReplacement && task.get().getAssignedTask().isSetSlaveId()) {
+          reserver.reserve(task.get().getAssignedTask().getSlaveId(), instance);
+        }
       } else {
         // Due to async event processing it's possible to have a race between task event
         // and it's deletion from the store. This is a perfectly valid case.
@@ -146,6 +158,7 @@ interface InstanceActionHandler {
         IJobUpdateInstructions instructions,
         MutableStoreProvider storeProvider,
         StateManager stateManager,
+        UpdateAgentReserver reserver,
         JobUpdateStatus status,
         IJobUpdateKey key) {
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index c129896..282ead4 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -33,10 +33,12 @@ import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED_TERMINATED;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
 
@@ -116,6 +118,11 @@ class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
     return observedFailures > toleratedFailures;
   }
 
+  private boolean resourceFits(ITaskConfig desired, ITaskConfig existing) {
+    return bagFromResources(existing.getResources())
+        .greaterThanOrEqualTo(bagFromResources(desired.getResources()));
+  }
+
   private StateEvaluator.Result handleActualAndDesiredPresent(IScheduledTask actualState) {
     Preconditions.checkState(desiredState.isPresent());
     Preconditions.checkArgument(!actualState.getTaskEvents().isEmpty());
@@ -144,7 +151,13 @@ class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
       // This is not the configuration that we would like to run.
       if (isKillable(status)) {
         // Task is active, kill it.
-        return KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+        if (resourceFits(desiredState.get(), actualState.getAssignedTask().getTask())) {
+          // If the desired task fits into the existing offer, we reserve the offer.
+          return KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE;
+        } else {
+          // The resource requirements have increased, force fresh scheduling attempt.
+          return KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+        }
       } else if (Tasks.isTerminated(status) && isPermanentlyKilled(actualState)) {
         // The old task has exited, it is now safe to add the new one.
         return REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index e141124..5420235 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -125,6 +125,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final PulseHandler pulseHandler;
   private final Lifecycle lifecycle;
   private final TaskEventBatchWorker batchWorker;
+  private final UpdateAgentReserver updateAgentReserver;
 
   // Currently-active updaters. An active updater is one that is rolling forward or back. Paused
   // and completed updates are represented only in storage, not here.
@@ -138,6 +139,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
       Storage storage,
       ScheduledExecutorService executor,
       StateManager stateManager,
+      UpdateAgentReserver updateAgentReserver,
       Clock clock,
       Lifecycle lifecycle,
       TaskEventBatchWorker batchWorker) {
@@ -151,6 +153,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.lifecycle = requireNonNull(lifecycle);
     this.batchWorker = requireNonNull(batchWorker);
     this.pulseHandler = new PulseHandler(clock);
+    this.updateAgentReserver = requireNonNull(updateAgentReserver);
   }
 
   @Override
@@ -691,6 +694,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
                 instructions,
                 storeProvider,
                 stateManager,
+                updateAgentReserver,
                 updaterStatus,
                 key);
             if (reevaluateDelay.isPresent()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
index c95943d..06a7695 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/StateEvaluator.java
@@ -53,6 +53,8 @@ interface StateEvaluator<T> {
     EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.AWAIT_STATE_CHANGE), NO_FAILURE),
     REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.ADD_TASK), NO_FAILURE),
     KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE(Optional.of(InstanceAction.KILL_TASK), NO_FAILURE),
+    KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE(
+        Optional.of(InstanceAction.KILL_TASK_AND_RESERVE), NO_FAILURE),
     EVALUATE_AFTER_MIN_RUNNING_MS(Optional.of(InstanceAction.WATCH_TASK), NO_FAILURE),
     SUCCEEDED(Optional.absent(), NO_FAILURE),
     FAILED_TERMINATED(Optional.absent(), Optional.of(Failure.EXITED));

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
new file mode 100644
index 0000000..fca4b4e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Reserves agents for instances being updated. Multiple instance keys can be registered against
+ * a single agent.
+ */
+public interface UpdateAgentReserver {
+
+  /**
+   * Reserves the agent id for the given key. Should behave like a multi-map under the hood.
+   *
+   * @param agentId The agent id to reserve.
+   * @param key The instance key that will use the reservation.
+   */
+  void reserve(String agentId, IInstanceKey key);
+
+  /**
+   * Releases the reservation on an agent id for the given key.
+   *
+   * @param agentId The agent id to release the reservation on.
+   * @param key The instance key that should be removed.
+   */
+  void release(String agentId, IInstanceKey key);
+
+  /**
+   * Returns the agent id associated with the given instance key.
+   *
+   * @param key The instance key to look up.
+   * @return An optional agent id string.
+   */
+  Optional<String> getAgent(IInstanceKey key);
+
+  /**
+   * Get all reservations for a given agent id. Useful for skipping over the agent between the
+   * reserve/release window.
+   *
+   * @param agentId The agent id to look up reservations for.
+   * @return A set of keys reserved for that agent.
+   */
+  Set<IInstanceKey> getReservations(String agentId);
+
+  /**
+   * Check if the agent reserver has any reservations for the provided key.
+   *
+   * @param groupKey The key to check.
+   * @return True if there are reservations against any instances in that key.
+   */
+  boolean hasReservations(TaskGroupKey groupKey);
+
+  /**
+   * Implementation of the update reserver backed by a BiCache (the same mechanism we use for
+   * preemption). This means it will expire reservations that haven't been explicitly released
+   * after the configured timeout.
+   */
+  class UpdateAgentReserverImpl implements UpdateAgentReserver {
+    private static final Logger LOG = LoggerFactory.getLogger(UpdateAgentReserverImpl.class);
+
+    private final BiCache<IInstanceKey, String> cache;
+
+    @Inject
+    UpdateAgentReserverImpl(BiCache<IInstanceKey, String> cache) {
+      this.cache = requireNonNull(cache);
+    }
+
+    public void reserve(String agentId, IInstanceKey key) {
+      LOG.info("Reserving {} for {}", agentId, key);
+      cache.put(key, agentId);
+    }
+
+    public void release(String agentId, IInstanceKey key) {
+      LOG.info("Releasing reservation on {} for {}", agentId, key);
+      cache.remove(key, agentId);
+    }
+
+    public Set<IInstanceKey> getReservations(String agentId) {
+      return cache.getByValue(agentId);
+    }
+
+    @Override
+    public boolean hasReservations(TaskGroupKey groupKey) {
+      return cache.asMap().entrySet().stream()
+          .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob()))
+          .findFirst()
+          .isPresent();
+    }
+
+    @Override
+    public Optional<String> getAgent(IInstanceKey key) {
+      return cache.get(key);
+    }
+  }
+
+  /**
+   * Used to effectively disable reservations.
+   */
+  class NullAgentReserver implements UpdateAgentReserver {
+    @Override
+    public void reserve(String agentId, IInstanceKey key) {
+      // noop
+    }
+
+    @Override
+    public void release(String agentId, IInstanceKey key) {
+      // noop
+    }
+
+    @Override
+    public Optional<String> getAgent(IInstanceKey key) {
+      return Optional.absent();
+    }
+
+    @Override
+    public Set<IInstanceKey> getReservations(String agentId) {
+      return ImmutableSet.of();
+    }
+
+    @Override
+    public boolean hasReservations(TaskGroupKey groupKey) {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
index 13cbdad..219bb4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -21,10 +21,19 @@ import javax.inject.Singleton;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
 
+import org.apache.aurora.common.args.Arg;
+import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +44,24 @@ public class UpdaterModule extends AbstractModule {
   private static final Logger LOG = LoggerFactory.getLogger(UpdaterModule.class);
 
   private final ScheduledExecutorService executor;
+  private final boolean enableAffinity;
+
+  @CmdLine(name = "enable_update_affinity", help = "Enable best-effort affinity of task updates.")
+  private static final Arg<Boolean> ENABLE_AFFINITY = Arg.create(false);
+
+  @CmdLine(name = "update_affinity_reservation_hold_time",
+      help = "How long to wait for a reserved agent to reoffer freed up resources.")
+  private static final Arg<Amount<Long, Time>> AFFINITY_EXPIRATION =
+      Arg.create(Amount.of(3L, Time.MINUTES));
 
   public UpdaterModule() {
-    this(AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG));
+    this(AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG), ENABLE_AFFINITY.get());
   }
 
   @VisibleForTesting
-  UpdaterModule(ScheduledExecutorService executor) {
+  UpdaterModule(ScheduledExecutorService executor, boolean enableAffinity) {
     this.executor = Objects.requireNonNull(executor);
+    this.enableAffinity = enableAffinity;
   }
 
   @Override
@@ -50,6 +69,18 @@ public class UpdaterModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
+        if (enableAffinity) {
+          bind(BiCacheSettings.class).toInstance(
+              new BiCacheSettings(AFFINITY_EXPIRATION.get(), "update_affinity_cache_size"));
+          bind(new TypeLiteral<BiCache<IInstanceKey, TaskGroupKey>>() { }).in(Singleton.class);
+          bind(UpdateAgentReserver.class).to(UpdateAgentReserver.UpdateAgentReserverImpl.class);
+          bind(UpdateAgentReserver.UpdateAgentReserverImpl.class).in(Singleton.class);
+        } else {
+          bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
+          bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);
+        }
+        expose(UpdateAgentReserver.class);
+
         bind(ScheduledExecutorService.class).toInstance(executor);
         bind(UpdateFactory.class).to(UpdateFactory.UpdateFactoryImpl.class);
         bind(UpdateFactory.UpdateFactoryImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
index c163826..c8d4fbb 100644
--- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java
@@ -30,6 +30,8 @@ import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ResourceBagTest {
   @Test
@@ -89,4 +91,17 @@ public class ResourceBagTest {
         new ResourceBag(ImmutableMap.of(CPUS, -1.0)),
         bag(-1.0, 128, 1024).filter(IS_NEGATIVE));
   }
+
+  @Test
+  public void testGreaterThanOrEqualTo() {
+    assertTrue(LARGE.greaterThanOrEqualTo(SMALL));
+    assertTrue(LARGE.greaterThanOrEqualTo(LARGE));
+    assertFalse(SMALL.greaterThanOrEqualTo(LARGE));
+    assertTrue(
+        new ResourceBag(ImmutableMap.of(CPUS, 1.0, RAM_MB, 132768.0))
+            .greaterThanOrEqualTo(new ResourceBag(ImmutableMap.of(CPUS, 1.0))));
+    assertFalse(
+        new ResourceBag(ImmutableMap.of(CPUS, 1.0))
+            .greaterThanOrEqualTo(new ResourceBag(ImmutableMap.of(CPUS, 1.0, RAM_MB, 132768.0))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index fa1a8178..09f443b 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -145,7 +145,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         storageUtil.mutableStoreProvider,
         new ResourceRequest(task.getAssignedTask().getTask(), bag(task), empty()),
         TaskGroupKey.from(task.getAssignedTask().getTask()),
-        ImmutableSet.of(Tasks.id(task)),
+        ImmutableSet.of(task.getAssignedTask()),
         reservationMap));
   }
 
@@ -329,7 +329,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         EasyMock.anyObject(),
         eq(new ResourceRequest(taskA.getAssignedTask().getTask(), bag(taskA), empty())),
         eq(TaskGroupKey.from(taskA.getAssignedTask().getTask())),
-        eq(SINGLE_TASK),
+        eq(ImmutableSet.of(taskA.getAssignedTask())),
         eq(NO_RESERVATION))).andReturn(SCHEDULED_RESULT);
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index cf2d25e..11835dc 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -28,6 +28,7 @@ import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -41,9 +42,11 @@ import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
 import org.apache.mesos.v1.Protos.AgentID;
 import org.apache.mesos.v1.Protos.FrameworkID;
 import org.apache.mesos.v1.Protos.OfferID;
@@ -72,6 +75,7 @@ import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LA
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.v1.Protos.Offer;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -95,6 +99,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
       .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
       .setAgentId(MESOS_OFFER.getAgentId())
       .build();
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
   private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
   private static final UnusedResource UNUSED = new UnusedResource(
       bagFromMesosResources(MESOS_OFFER.getResourcesList()),
@@ -125,6 +131,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
   private TaskAssignerImpl assigner;
   private TierManager tierManager;
   private FakeStatsProvider statsProvider;
+  private UpdateAgentReserver updateAgentReserver;
 
   @Before
   public void setUp() throws Exception {
@@ -134,6 +141,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
     offerManager = createMock(OfferManager.class);
     tierManager = createMock(TierManager.class);
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
     statsProvider = new FakeStatsProvider();
     assigner = new TaskAssignerImpl(
         stateManager,
@@ -141,6 +149,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
         taskFactory,
         offerManager,
         tierManager,
+        updateAgentReserver,
         statsProvider);
     resourceRequest = new ResourceRequest(
         TASK.getAssignedTask().getTask(),
@@ -159,6 +168,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
   @Test
   public void testAssignPartialNoVetoes() throws Exception {
+    expectNoUpdateReservations(1);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
@@ -177,7 +187,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertNotEquals(empty(), aggregate);
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
@@ -185,6 +198,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
   @Test
   public void testAssignVetoesWithStaticBan() throws Exception {
+    expectNoUpdateReservations(1);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
@@ -200,13 +214,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableSet.of(TASK.getAssignedTask()),
             NO_RESERVATION));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
   }
 
   @Test
   public void testAssignVetoesWithNoStaticBan() throws Exception {
+    expectNoUpdateReservations(1);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(UNUSED, resourceRequest))
@@ -221,13 +236,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableSet.of(TASK.getAssignedTask()),
             NO_RESERVATION));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
   }
 
   @Test
   public void testAssignmentClearedOnError() throws Exception {
+    expectNoUpdateReservations(1);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
     offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
     expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
@@ -255,7 +271,10 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK), "id2", "id3"),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
             NO_RESERVATION));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
@@ -263,6 +282,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
 
   @Test
   public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expectNoUpdateReservations(0);
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
 
@@ -275,7 +295,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableSet.of(TASK.getAssignedTask()),
             ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
                 ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
@@ -286,6 +306,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
     // Ensures slave/task reservation relationship is only enforced in slave->task direction
     // and permissive in task->slave direction. In other words, a task with a slave reservation
     // should still be tried against other unreserved slaves.
+    expectNoUpdateReservations(1);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(
@@ -307,7 +328,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableSet.of(TASK.getAssignedTask()),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
   }
@@ -329,6 +350,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
             .build(),
         IHostAttributes.build(new HostAttributes()));
 
+    expectNoUpdateReservations(2);
     expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
     expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
     expect(filter.filter(
@@ -358,7 +380,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
             storeProvider,
             resourceRequest,
             TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(Tasks.id(TASK)),
+            ImmutableSet.of(TASK.getAssignedTask()),
             ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
     assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
   }
@@ -375,6 +397,107 @@ public class TaskAssignerImplTest extends EasyMockTest {
         assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
   }
 
+  @Test
+  public void testAssignToReservedAgent() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    AttributeAggregate aggregate = empty();
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest))
+        .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+    expectLastCall();
+
+    control.replay();
+
+    AttributeAggregate aggregate = empty();
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(),
+        assigner.maybeAssign(
+            storeProvider,
+            new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(empty(), aggregate);
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
+    AttributeAggregate aggregate = empty();
+    ResourceRequest resources = new ResourceRequest(
+        TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
+    expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER))
+        .andReturn(TASK_INFO);
+
+    // Normal scheduling loop for the remaining task...
+    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
+    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
+    expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
+        .andReturn(ImmutableSet.of());
+    expect(filter.filter(UNUSED, resources))
+        .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
+    offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resources,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("another-task", JOB, 9999).getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
+  }
+
   private void expectAssignTask(Offer offer) {
     expect(stateManager.assignTask(
         eq(storeProvider),
@@ -383,4 +506,11 @@ public class TaskAssignerImplTest extends EasyMockTest {
         eq(offer.getAgentId()),
         anyObject())).andReturn(TASK.getAssignedTask());
   }
+
+  private void expectNoUpdateReservations(int offers) {
+    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
+    for (int i = 0; i < offers; i++) {
+      expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
index b2c4c66..43f857d 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
@@ -54,12 +54,14 @@ public class AddTaskTest extends EasyMockTest {
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private InstanceActionHandler handler;
+  private UpdateAgentReserver updateAgentReserver;
 
   @Before
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
     handler = new InstanceActionHandler.AddTask();
   }
 
@@ -79,6 +81,7 @@ public class AddTaskTest extends EasyMockTest {
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
+        updateAgentReserver,
         JobUpdateStatus.ROLLING_FORWARD,
         UPDATE_ID);
   }
@@ -96,6 +99,7 @@ public class AddTaskTest extends EasyMockTest {
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
+        updateAgentReserver,
         JobUpdateStatus.ROLLING_FORWARD,
         UPDATE_ID);
   }
@@ -111,6 +115,7 @@ public class AddTaskTest extends EasyMockTest {
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
+        updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
         UPDATE_ID);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index df1f839..8832308 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -46,6 +46,7 @@ import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
+import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
 import static org.junit.Assert.assertEquals;
@@ -56,7 +57,10 @@ public class InstanceUpdaterTest {
   private static final ITaskConfig OLD = ITaskConfig.build(new TaskConfig()
           .setResources(ImmutableSet.of(numCpus(1.0))));
   private static final ITaskConfig NEW = ITaskConfig.build(new TaskConfig()
-          .setResources(ImmutableSet.of(numCpus(2.0))));
+          .setProduction(true)
+          .setResources(ImmutableSet.of(numCpus(1.0))));
+  private static final ITaskConfig NEW_EXTRA_RESOURCES = ITaskConfig.build(new TaskConfig()
+      .setResources(ImmutableSet.of(numCpus(2.0))));
 
   private static final Amount<Long, Time> MIN_RUNNING_TIME = Amount.of(1L, Time.MINUTES);
   private static final Amount<Long, Time> A_LONG_TIME = Amount.of(1L, Time.DAYS);
@@ -121,7 +125,7 @@ public class InstanceUpdaterTest {
   public void testSuccessfulUpdate() {
     TestFixture f = new TestFixture(NEW, 1);
     f.setActualState(OLD);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
     f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
     f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
     f.setActualState(NEW);
@@ -132,10 +136,24 @@ public class InstanceUpdaterTest {
   }
 
   @Test
+  public void testUpdateWithResourceChange() {
+    TestFixture f = new TestFixture(NEW_EXTRA_RESOURCES, 1);
+    f.setActualState(OLD);
+    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
+    f.setActualState(NEW_EXTRA_RESOURCES);
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, PENDING, ASSIGNED, STARTING);
+    f.evaluate(EVALUATE_AFTER_MIN_RUNNING_MS, RUNNING);
+    f.advanceTime(MIN_RUNNING_TIME);
+    f.evaluateCurrentState(SUCCEEDED);
+  }
+
+  @Test
   public void testUpdateRetryOnTaskExit() {
     TestFixture f = new TestFixture(NEW, 1);
     f.setActualState(OLD);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
     f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
     f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
     f.setActualState(NEW);
@@ -152,7 +170,7 @@ public class InstanceUpdaterTest {
   public void testUpdateRetryFailure() {
     TestFixture f = new TestFixture(NEW, 0);
     f.setActualState(OLD);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
     f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
     f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
     f.setActualState(NEW);
@@ -203,7 +221,7 @@ public class InstanceUpdaterTest {
   public void testStuckInPending() {
     TestFixture f = new TestFixture(NEW, 1);
     f.setActualState(OLD);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
     f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
     f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
     f.setActualState(NEW);
@@ -216,7 +234,7 @@ public class InstanceUpdaterTest {
   public void testSlowToKill() {
     TestFixture f = new TestFixture(NEW, 1);
     f.setActualState(OLD);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
     f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
     f.evaluate(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE, FINISHED);
     f.setActualState(NEW);
@@ -264,7 +282,7 @@ public class InstanceUpdaterTest {
 
     // Task did not pass through KILLING, therefore will be rescheduled.
     f.evaluate(EVALUATE_ON_STATE_CHANGE, FINISHED);
-    f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, PENDING);
+    f.evaluate(KILL_TASK_WITH_RESERVATION_AND_EVALUATE_ON_STATE_CHANGE, PENDING);
     f.setActualStateAbsent();
     f.evaluateCurrentState(REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
     f.setActualState(NEW);

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index 30b44f8..290385d 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -180,7 +180,7 @@ public class JobUpdaterIT extends EasyMockTest {
     TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
 
     Injector injector = Guice.createInjector(
-        new UpdaterModule(executor),
+        new UpdaterModule(executor, true),
         DbModule.testModuleWithWorkQueue(),
         new AbstractModule() {
           @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
index 833fd62..61f66b9 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/KillTaskTest.java
@@ -31,11 +31,13 @@ import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 
 public class KillTaskTest extends EasyMockTest {
   private static final IJobUpdateInstructions INSTRUCTIONS = IJobUpdateInstructions.build(
@@ -52,13 +54,15 @@ public class KillTaskTest extends EasyMockTest {
   private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private InstanceActionHandler handler;
+  private UpdateAgentReserver updateAgentReserver;
 
   @Before
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
-    handler = new InstanceActionHandler.KillTask();
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
+    handler = new InstanceActionHandler.KillTask(false);
   }
 
   @Test
@@ -83,6 +87,36 @@ public class KillTaskTest extends EasyMockTest {
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
+        updateAgentReserver,
+        JobUpdateStatus.ROLLING_BACK,
+        UPDATE_ID);
+  }
+
+  @Test
+  public void testKillForUpdateReservesAgentForInstance() throws Exception {
+    String id = "task_id";
+    IScheduledTask task = TaskTestUtil.makeTask(id, INSTANCE.getJobKey(), 1, "agent01");
+    storageUtil.expectTaskFetch(Query.instanceScoped(INSTANCE).active(), task);
+
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        id,
+        Optional.absent(),
+        ScheduleStatus.KILLING,
+        Optional.of("Killed for job update " + UPDATE_ID.getId())))
+        .andReturn(StateChangeResult.SUCCESS);
+
+    updateAgentReserver.reserve(task.getAssignedTask().getSlaveId(), INSTANCE);
+    expectLastCall();
+
+    control.replay();
+
+    new InstanceActionHandler.KillTask(true).getReevaluationDelay(
+        INSTANCE,
+        INSTRUCTIONS,
+        storageUtil.mutableStoreProvider,
+        stateManager,
+        updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
         UPDATE_ID);
   }
@@ -98,6 +132,7 @@ public class KillTaskTest extends EasyMockTest {
         INSTRUCTIONS,
         storageUtil.mutableStoreProvider,
         stateManager,
+        updateAgentReserver,
         JobUpdateStatus.ROLLING_BACK,
         UPDATE_ID);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
new file mode 100644
index 0000000..d8563b8
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/NullAgentReserverTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver.NullAgentReserver;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NullAgentReserverTest extends EasyMockTest {
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JobKeys.from("role", "env", "name"), 1);
+
+  @Test
+  public void testNullReserver() {
+    control.replay();
+    NullAgentReserver reserver = new NullAgentReserver();
+    reserver.reserve("test", INSTANCE_KEY);
+    assertFalse(reserver.getAgent(INSTANCE_KEY).isPresent());
+    assertTrue(reserver.getReservations("test").isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/85fed6ba/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
new file mode 100644
index 0000000..1bc2a77
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/UpdateAgentReserverImplTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver.UpdateAgentReserverImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class UpdateAgentReserverImplTest extends EasyMockTest {
+  private UpdateAgentReserver reserver;
+  private BiCache<IInstanceKey, String> cache;
+
+  private static final String AGENT_ID = "agent";
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JobKeys.from("role", "env", "name"), 1);
+
+  private TaskGroupKey getTaskGroup(IInstanceKey key) {
+    return TaskGroupKey.from(ITaskConfig.build(
+        new TaskConfig()
+            .setJob(key.getJobKey().newBuilder())
+            .setNumCpus(1.0)
+            .setRamMb(1L)
+            .setDiskMb(1L)));
+  }
+
+  @Before
+  public void setUp() {
+    cache = createMock(new Clazz<BiCache<IInstanceKey, String>>() { });
+    reserver = new UpdateAgentReserverImpl(cache);
+  }
+
+  @Test
+  public void testReserve() {
+    cache.put(INSTANCE_KEY, AGENT_ID);
+    expectLastCall();
+    control.replay();
+    reserver.reserve(AGENT_ID, INSTANCE_KEY);
+  }
+
+  @Test
+  public void testRelease() {
+    cache.remove(INSTANCE_KEY, AGENT_ID);
+    expectLastCall();
+    control.replay();
+    reserver.release(AGENT_ID, INSTANCE_KEY);
+  }
+
+  @Test
+  public void testGetReservations() {
+    expect(cache.getByValue(AGENT_ID)).andReturn(ImmutableSet.of(INSTANCE_KEY));
+    control.replay();
+    assertEquals(ImmutableSet.of(INSTANCE_KEY), reserver.getReservations(AGENT_ID));
+  }
+
+  @Test
+  public void testHasReservations() {
+    IInstanceKey instanceKey2 = InstanceKeys.from(JobKeys.from("role", "env", "name"), 2);
+    IInstanceKey instanceKey3 = InstanceKeys.from(JobKeys.from("role2", "env2", "name2"), 1);
+    expect(cache.asMap())
+        .andReturn(ImmutableMap.of(
+            INSTANCE_KEY,
+            AGENT_ID,
+            instanceKey2,
+            AGENT_ID,
+            instanceKey3,
+            "different-agent")).anyTimes();
+    control.replay();
+    assertTrue(reserver.hasReservations(getTaskGroup(INSTANCE_KEY)));
+    assertTrue(reserver.hasReservations(getTaskGroup(instanceKey2)));
+    assertTrue(reserver.hasReservations(getTaskGroup(instanceKey3)));
+    assertTrue(reserver.hasReservations(
+        getTaskGroup(InstanceKeys.from(JobKeys.from("role", "env", "name"), 3))));
+    assertFalse(reserver.hasReservations(
+        getTaskGroup(InstanceKeys.from(JobKeys.from("not", "in", "map"), 1))));
+  }
+
+}


Mime
View raw message