aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/3] Add a controller for job updates.
Date Tue, 16 Sep 2014 21:31:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
new file mode 100644
index 0000000..82f2b6d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
+import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.updater.strategy.QueueStrategy;
+import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * A factory that produces job updaters based on a job update configuration.
+ * <p>
+ * TODO(wfarner): Use AssistedInject to inject this (github.com/google/guice/wiki/AssistedInject).
+ */
+interface UpdateFactory {
+
+  /**
+   * Creates a one-way job updater that will execute the job update configuration in the direction
+   * specified by {@code rollingForward}.
+   *
+   * @param configuration Configuration to act on.
+   * @param rollingForward {@code true} if this is a job update, {@code false} if it is a rollback.
+   * @return An updater that will execute the job update as specified in the
+   *         {@code configuration}.
+   * @throws UpdateConfigurationException If the provided configuration cannot be used.
+   */
+  Update newUpdate(
+      IJobUpdateConfiguration configuration,
+      boolean rollingForward) throws UpdateConfigurationException;
+
+  class UpdateFactoryImpl implements UpdateFactory {
+    private final Clock clock;
+
+    @Inject
+    UpdateFactoryImpl(Clock clock) {
+      this.clock = requireNonNull(clock);
+    }
+
+    @Override
+    public Update newUpdate(
+        IJobUpdateConfiguration configuration,
+        boolean rollingForward) throws UpdateConfigurationException {
+
+      requireNonNull(configuration);
+      IJobUpdateSettings settings = configuration.getSettings();
+      checkArgument(
+          settings.getMaxWaitToInstanceRunningMs() > 0,
+          "Max wait to running must be positive.");
+      checkArgument(
+          settings.getMinWaitInInstanceRunningMs() > 0,
+          "Min wait in running must be positive.");
+      checkArgument(
+          settings.getUpdateGroupSize() > 0,
+          "Update group size must be positive.");
+      checkArgument(
+          configuration.getInstanceCount() > 0,
+          "Instance count must be positive.");
+
+      Set<Integer> instances;
+      Range<Integer> updateConfigurationInstances =
+          Range.closedOpen(0, configuration.getInstanceCount());
+      if (settings.getUpdateOnlyTheseInstances().isEmpty()) {
+        Set<Integer> newInstanceIds =
+            ImmutableRangeSet.of(updateConfigurationInstances).asSet(DiscreteDomain.integers());
+
+        // In a full job update, the working set is the union of instance IDs before and after.
+        instances =  ImmutableSet.copyOf(
+            Sets.union(expandInstanceIds(configuration.getOldTaskConfigs()), newInstanceIds));
+      } else {
+        instances = rangesToInstanceIds(settings.getUpdateOnlyTheseInstances());
+
+        // TODO(wfarner): Move this check out to SchedulerThriftInterface, and remove the
+        // UpdateConfigurationException from this method's signature.
+        if (!updateConfigurationInstances.containsAll(instances)) {
+          throw new UpdateConfigurationException(
+              "When updating specific instances, "
+                  + "all specified instances must be in the update configuration.");
+        }
+      }
+
+      ImmutableMap.Builder<Integer, StateEvaluator<Optional<IScheduledTask>>> evaluators =
+          ImmutableMap.builder();
+      for (int instanceId : instances) {
+        Optional<ITaskConfig> desiredState;
+        if (rollingForward) {
+          desiredState = updateConfigurationInstances.contains(instanceId)
+              ? Optional.of(configuration.getNewTaskConfig())
+              : Optional.<ITaskConfig>absent();
+        } else {
+          desiredState = getConfig(instanceId, configuration.getOldTaskConfigs());
+        }
+
+        evaluators.put(
+            instanceId,
+            new InstanceUpdater(
+                desiredState,
+                settings.getMaxPerInstanceFailures(),
+                Amount.of((long) settings.getMinWaitInInstanceRunningMs(), Time.MILLISECONDS),
+                Amount.of((long) settings.getMaxWaitToInstanceRunningMs(), Time.MILLISECONDS),
+                clock));
+      }
+
+      // TODO(wfarner): Add the batch_completion flag to JobUpdateSettings and pick correct
+      // strategy.
+      Ordering<Integer> updateOrder = rollingForward
+          ? Ordering.<Integer>natural()
+          : Ordering.<Integer>natural().reverse();
+
+      UpdateStrategy<Integer> strategy =
+          new QueueStrategy<>(updateOrder, settings.getUpdateGroupSize());
+
+      return new Update(
+          new OneWayJobUpdater<>(
+              strategy,
+              settings.getMaxFailedInstances(),
+              evaluators.build()),
+          rollingForward ? JobUpdateStatus.ROLLED_FORWARD : JobUpdateStatus.ROLLED_BACK,
+          rollingForward ? JobUpdateStatus.ROLLING_BACK : JobUpdateStatus.FAILED);
+    }
+
+    private static Range<Integer> toRange(IRange range) {
+      return Range.closed(range.getFirst(), range.getLast());
+    }
+
+    private static Set<Integer> rangesToInstanceIds(Set<IRange> ranges) {
+      ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
+      for (IRange range : ranges) {
+        instanceIds.add(toRange(range));
+      }
+
+      return instanceIds.build().asSet(DiscreteDomain.integers());
+    }
+
+    @VisibleForTesting
+    static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) {
+      ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
+      for (IInstanceTaskConfig group : instanceGroups) {
+        for (IRange range : group.getInstances()) {
+          instanceIds.add(toRange(range));
+        }
+      }
+
+      return instanceIds.build().asSet(DiscreteDomain.integers());
+    }
+
+    private static Optional<ITaskConfig> getConfig(
+        int id,
+        Set<IInstanceTaskConfig> instanceGroups) {
+
+      for (IInstanceTaskConfig group : instanceGroups) {
+        for (IRange range : group.getInstances()) {
+          if (toRange(range).contains(id)) {
+            return Optional.of(group.getTask());
+          }
+        }
+      }
+
+      return Optional.absent();
+    }
+  }
+
+  class Update {
+    private final OneWayJobUpdater<Integer, Optional<IScheduledTask>> updater;
+    private final JobUpdateStatus successStatus;
+    private final JobUpdateStatus failureStatus;
+
+    public Update(
+        OneWayJobUpdater<Integer, Optional<IScheduledTask>> updater,
+        JobUpdateStatus successStatus,
+        JobUpdateStatus failureStatus) {
+
+      this.updater = requireNonNull(updater);
+      this.successStatus = requireNonNull(successStatus);
+      this.failureStatus = requireNonNull(failureStatus);
+    }
+
+    public OneWayJobUpdater<Integer, Optional<IScheduledTask>> getUpdater() {
+      return updater;
+    }
+
+    public JobUpdateStatus getSuccessStatus() {
+      return successStatus;
+    }
+
+    public JobUpdateStatus getFailureStatus() {
+      return failureStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
index 028cb07..5733da3 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -13,18 +13,52 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
 import javax.inject.Singleton;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 
 /**
  * Binding module for scheduling logic and higher-level state management.
  */
 public class UpdaterModule extends AbstractModule {
 
+  private final ScheduledExecutorService executor;
+
+  public UpdaterModule() {
+    this(Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("updater-%d").build()));
+  }
+
+  @VisibleForTesting
+  UpdaterModule(ScheduledExecutorService executor) {
+    this.executor = Objects.requireNonNull(executor);
+  }
+
   @Override
   protected void configure() {
-    bind(JobUpdateController.class).to(JobUpdateControllerImpl.class);
-    bind(JobUpdateControllerImpl.class).in(Singleton.class);
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        bind(ScheduledExecutorService.class).toInstance(executor);
+        bind(UpdateFactory.class).to(UpdateFactory.UpdateFactoryImpl.class);
+        bind(UpdateFactory.UpdateFactoryImpl.class).in(Singleton.class);
+        bind(JobUpdateController.class).to(JobUpdateControllerImpl.class);
+        bind(JobUpdateControllerImpl.class).in(Singleton.class);
+        expose(JobUpdateController.class);
+        bind(JobUpdateEventSubscriber.class);
+        expose(JobUpdateEventSubscriber.class);
+      }
+    });
+
+    PubsubEventModule.bindSubscriber(binder(), JobUpdateEventSubscriber.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java b/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
index 8298ea2..855ea9c 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/strategy/ActiveLimitedStrategy.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.updater.strategy;
 
+import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
@@ -25,6 +26,7 @@ import com.google.common.collect.Ordering;
  * @param <T> Instance type.
  */
 abstract class ActiveLimitedStrategy<T extends Comparable<T>> implements UpdateStrategy<T> {
+  private final Ordering<T> ordering;
   protected final int maxActive;
 
   /**
@@ -33,7 +35,8 @@ abstract class ActiveLimitedStrategy<T extends Comparable<T>> implements UpdateS
    * @param maxActive Maximum number of values to return from
    * {@link #getNextGroup(java.util.Set, java.util.Set)}.
    */
-  protected ActiveLimitedStrategy(int maxActive) {
+  protected ActiveLimitedStrategy(Ordering<T> ordering, int maxActive) {
+    this.ordering = Objects.requireNonNull(ordering);
     Preconditions.checkArgument(maxActive > 0);
     this.maxActive = maxActive;
   }
@@ -41,7 +44,7 @@ abstract class ActiveLimitedStrategy<T extends Comparable<T>> implements UpdateS
   @Override
   public final Set<T> getNextGroup(Set<T> idle, Set<T> active) {
     return FluentIterable
-        .from(Ordering.natural().sortedCopy(doGetNextGroup(idle, active)))
+        .from(ordering.sortedCopy(doGetNextGroup(idle, active)))
         .limit(Math.max(0, maxActive - active.size()))
         .toSet();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/strategy/BatchStrategy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/strategy/BatchStrategy.java b/src/main/java/org/apache/aurora/scheduler/updater/strategy/BatchStrategy.java
index a3e666e..67d595b 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/strategy/BatchStrategy.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/strategy/BatchStrategy.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.updater.strategy;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
 
 /**
  * An update strategy that will only add more work when the current active group is empty.
@@ -29,8 +30,8 @@ public class BatchStrategy<T extends Comparable<T>> extends ActiveLimitedStrateg
    *
    * @param maxActive The maximum number of active entries.
    */
-  public BatchStrategy(int maxActive) {
-    super(maxActive);
+  public BatchStrategy(Ordering<T> ordering, int maxActive) {
+    super(ordering, maxActive);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/strategy/QueueStrategy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/strategy/QueueStrategy.java b/src/main/java/org/apache/aurora/scheduler/updater/strategy/QueueStrategy.java
index 0cf3683..01e11d0 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/strategy/QueueStrategy.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/strategy/QueueStrategy.java
@@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.updater.strategy;
 
 import java.util.Set;
 
+import com.google.common.collect.Ordering;
+
 /**
  * An update strategy that attempts to keep the in-progress queue full at all times, with an upper
  * bound.
@@ -22,8 +24,8 @@ import java.util.Set;
  * @param <T> Instance type.
  */
 public class QueueStrategy<T extends Comparable<T>> extends ActiveLimitedStrategy<T> {
-  public QueueStrategy(int maxActive) {
-    super(maxActive);
+  public QueueStrategy(Ordering<T> ordering, int maxActive) {
+    super(ordering, maxActive);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/api.thrift b/src/main/thrift/org/apache/aurora/gen/api.thrift
index 8396e3d..3d0beea 100644
--- a/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -534,6 +534,13 @@ enum JobUpdateStatus {
 
   /** Unknown error during update. */
   ERROR = 7
+
+  /**
+   * Update failed to complete.
+   * This can happen if failure thresholds are met while rolling forward, but rollback is disabled,
+   * or if failure thresholds are met when rolling back.
+   */
+  FAILED = 8
 }
 
 /** Job update actions that can be applied to job instances. */

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index af644a9..e09caa6 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -61,8 +61,6 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_ADDED;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 public class DBJobUpdateStoreTest {
 
@@ -367,30 +365,40 @@ public class DBJobUpdateStoreTest {
     saveUpdate(makeJobUpdate(JOB, "update2"), "lock2");
   }
 
+  private static final Optional<String> NO_TOKEN = Optional.absent();
+
   @Test
-  public void testIsActive() {
+  public void testGetLockToken() {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
       public void execute(MutableStoreProvider storeProvider) {
         final IJobUpdate update1 = makeJobUpdate(JobKeys.from("role", "env", "name1"), "update1");
         final IJobUpdate update2 = makeJobUpdate(JobKeys.from("role", "env", "name2"), "update2");
         saveUpdate(update1, "lock1");
-        assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
-        assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+        assertEquals(
+            Optional.of("lock1"),
+            storeProvider.getJobUpdateStore().getLockToken("update1"));
+        assertEquals(NO_TOKEN, storeProvider.getJobUpdateStore().getLockToken("update2"));
 
         saveUpdate(update2, "lock2");
-        assertTrue(storeProvider.getJobUpdateStore().isActive("update1"));
-        assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+        assertEquals(
+            Optional.of("lock1"),
+            storeProvider.getJobUpdateStore().getLockToken("update1"));
+        assertEquals(
+            Optional.of("lock2"),
+            storeProvider.getJobUpdateStore().getLockToken("update2"));
 
         storeProvider.getLockStore().removeLock(
             makeLock(update1.getSummary().getJobKey(), "lock1").getKey());
-        assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
-        assertTrue(storeProvider.getJobUpdateStore().isActive("update2"));
+        assertEquals(NO_TOKEN, storeProvider.getJobUpdateStore().getLockToken("update1"));
+        assertEquals(
+            Optional.of("lock2"),
+            storeProvider.getJobUpdateStore().getLockToken("update2"));
 
         storeProvider.getLockStore().removeLock(
             makeLock(update2.getSummary().getJobKey(), "lock2").getKey());
-        assertFalse(storeProvider.getJobUpdateStore().isActive("update1"));
-        assertFalse(storeProvider.getJobUpdateStore().isActive("update2"));
+        assertEquals(NO_TOKEN, storeProvider.getJobUpdateStore().getLockToken("update1"));
+        assertEquals(NO_TOKEN, storeProvider.getJobUpdateStore().getLockToken("update2"));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 820a968..7d4dd37 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -117,6 +117,7 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
@@ -265,7 +266,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         new InvocationHandler() {
           @Override
           public Object invoke(Object o, Method method, Object[] args) throws Throwable {
-            System.out.println("Invoking " + method);
             Response response = (Response) method.invoke(realThrift, args);
             assertTrue(response.isSetResponseCode());
             assertNotNull(response.getDetails());
@@ -1083,7 +1083,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         0);
 
     expectAuth(ROOT, true);
-    storageUtil.expectTaskFetch(Query.instanceScoped(instance).active(), storedTask);
+    storageUtil.expectTaskFetch(
+        Query.instanceScoped(IInstanceKey.build(instance)).active(), storedTask);
 
     control.replay();
 
@@ -1111,7 +1112,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         0);
 
     expectAuth(ROOT, true);
-    storageUtil.expectTaskFetch(Query.instanceScoped(instanceKey).active(), storedTask);
+    storageUtil.expectTaskFetch(
+        Query.instanceScoped(IInstanceKey.build(instanceKey)).active(), storedTask);
     expect(storageUtil.taskStore.unsafeModifyInPlace(
         taskId,
         ITaskConfig.build(ConfigurationManager.applyDefaultsIfUnset(modifiedConfig.newBuilder()))))
@@ -2003,7 +2005,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   public void testStartUpdate() throws Exception {
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
-    expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
 
     IScheduledTask oldTask1 = buildScheduledTask(0, 5);
     IScheduledTask oldTask2 = buildScheduledTask(1, 5);
@@ -2032,7 +2033,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         oldTask6,
         oldTask7);
 
-    jobUpdateController.start(update, LOCK.getToken());
+    jobUpdateController.start(update, USER);
 
     control.replay();
 
@@ -2070,22 +2071,9 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testStartUpdateFailsLockValidation() throws Exception {
-    JobUpdateRequest request = buildJobUpdateRequest(populatedTask());
-    expectAuth(ROLE, true);
-    expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
-    expect(lockManager.acquireLock(LOCK_KEY, USER)).andThrow(new LockException("lock failed"));
-
-    control.replay();
-
-    assertResponse(LOCK_ERROR, thrift.startJobUpdate(request, SESSION));
-  }
-
-  @Test
   public void testStartUpdateFailsInController() throws Exception {
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
-    expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
 
     IScheduledTask oldTask = buildScheduledTask(0, 5);
     ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
@@ -2097,7 +2085,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
 
     storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active(), oldTask);
-    jobUpdateController.start(update, LOCK.getToken());
+    jobUpdateController.start(update, USER);
     expectLastCall().andThrow(new UpdateStateException("failed"));
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
new file mode 100644
index 0000000..1b8e5c2
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/AddTaskTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.JobUpdateConfiguration;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AddTaskTest extends EasyMockTest {
+  private static final IJobUpdateConfiguration CONFIG = IJobUpdateConfiguration.build(
+      new JobUpdateConfiguration()
+          .setSettings(
+              new JobUpdateSettings()
+                  .setMinWaitInInstanceRunningMs(1000)));
+  private static final IInstanceKey INSTANCE =
+      IInstanceKey.build(new InstanceKey(JobKeys.from("role", "env", "job").newBuilder(), 0));
+
+  private TaskStore taskStore;
+  private StateManager stateManager;
+  private InstanceActionHandler handler;
+
+  @Before
+  public void setUp() {
+    stateManager = createMock(StateManager.class);
+    taskStore = createMock(TaskStore.class);
+    handler = new InstanceActionHandler.AddTask();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testInstanceNotFound() throws Exception {
+    control.replay();
+
+    handler.getReevaluationDelay(
+        INSTANCE,
+        CONFIG,
+        taskStore,
+        stateManager,
+        JobUpdateStatus.ROLLING_BACK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java b/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
new file mode 100644
index 0000000..defdf6e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/EnumsTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.InstanceUpdateStatus;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for auto-generated functions in enums, to achieve 100% test coverage.
+ */
+public class EnumsTest {
+
+  @Test
+  public void testInstanceAction() {
+    for (InstanceAction value : InstanceAction.values()) {
+      assertEquals(value, InstanceAction.valueOf(value.toString()));
+    }
+  }
+
+  @Test
+  public void testInstanceUpdateStatus() {
+    for (InstanceUpdateStatus value : InstanceUpdateStatus.values()) {
+      assertEquals(value, InstanceUpdateStatus.valueOf(value.toString()));
+    }
+  }
+
+  @Test
+  public void testOneWayStatus() {
+    for (OneWayStatus value : OneWayStatus.values()) {
+      assertEquals(value, OneWayStatus.valueOf(value.toString()));
+    }
+  }
+
+  @Test
+  public void testStateEvaluatorResult() {
+    for (StateEvaluator.Result value : StateEvaluator.Result.values()) {
+      assertEquals(value, StateEvaluator.Result.valueOf(value.toString()));
+    }
+  }
+
+  @Test
+  public void testMonitorAction() {
+    for (MonitorAction value : MonitorAction.values()) {
+      assertEquals(value, MonitorAction.valueOf(value.toString()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/updater/FakeScheduledExecutor.java
new file mode 100644
index 0000000..e35fe23
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/FakeScheduledExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A simulated scheduled executor that records scheduled work and executes it when the clock is
+ * advanced past their execution time.
+ */
+class FakeScheduledExecutor extends FakeClock {
+
+  private final List<Pair<Long, Runnable>> deferredWork = Lists.newArrayList();
+
+  FakeScheduledExecutor(ScheduledExecutorService mock) {
+    mock.schedule(
+        EasyMock.<Runnable>anyObject(),
+        EasyMock.anyLong(),
+        EasyMock.eq(TimeUnit.MILLISECONDS));
+    expectLastCall().andAnswer(new IAnswer<ScheduledFuture<?>>() {
+      @Override
+      public ScheduledFuture<?> answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        Runnable work = (Runnable) args[0];
+        long millis = (Long) args[1];
+        Preconditions.checkArgument(millis > 0);
+        deferredWork.add(Pair.of(nowMillis() + millis, work));
+        return null;
+      }
+    }).anyTimes();
+  }
+
+  @Override
+  public void setNowMillis(long nowMillis) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void advance(Amount<Long, Time> period) {
+    super.advance(period);
+    Iterator<Pair<Long, Runnable>> entries = deferredWork.iterator();
+    List<Runnable> toExecute = Lists.newArrayList();
+    while (entries.hasNext()) {
+      Pair<Long, Runnable> next = entries.next();
+      if (next.getFirst() <= nowMillis()) {
+        entries.remove();
+        toExecute.add(next.getSecond());
+      }
+    }
+    for (Runnable work : toExecute) {
+      work.run();
+    }
+  }
+
+  void assertEmpty() {
+    assertEquals(ImmutableList.<Pair<Long, Runnable>>of(), deferredWork);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
index f38e6a3..4db0080 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/InstanceUpdaterTest.java
@@ -185,8 +185,8 @@ public class InstanceUpdaterTest {
     TestFixture f = new TestFixture(NO_CONFIG, 1);
     f.setActualState(OLD);
     f.evaluate(KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE, RUNNING);
-    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING, FINISHED);
-    f.setActualStateAbsent();
+    f.evaluate(EVALUATE_ON_STATE_CHANGE, KILLING);
+    f.evaluate(SUCCEEDED, FINISHED);
     f.evaluateCurrentState(SUCCEEDED);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
index 4127442..5242a43 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.updater;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.eventbus.EventBus;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -21,56 +20,55 @@ import com.twitter.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.InstanceKey;
-import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 
 public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
   private static final IJobKey JOB_A = JobKeys.from("role", "env", "name");
 
   private static final IScheduledTask TASK_A = IScheduledTask.build(
-      new ScheduledTask().setAssignedTask(
-          new AssignedTask()
-              .setInstanceId(5)
-              .setTask(new TaskConfig()
-                  .setOwner(new Identity().setRole(JOB_A.getRole()))
-                  .setEnvironment(JOB_A.getEnvironment())
-                  .setJobName(JOB_A.getName()))));
+      new ScheduledTask()
+          .setStatus(ScheduleStatus.PENDING)
+          .setAssignedTask(
+              new AssignedTask()
+                  .setInstanceId(5)
+                  .setTask(new TaskConfig()
+                      .setOwner(new Identity().setRole(JOB_A.getRole()))
+                      .setEnvironment(JOB_A.getEnvironment())
+                      .setJobName(JOB_A.getName()))));
   private static final IInstanceKey INSTANCE_A = IInstanceKey.build(
-      new InstanceKey(JOB_A.newBuilder(), TASK_A.getAssignedTask().getInstanceId()));
+      new InstanceKey()
+          .setJobKey(JOB_A.newBuilder())
+          .setInstanceId(TASK_A.getAssignedTask().getInstanceId()));
 
-  private StorageTestUtil storageUtil;
   private JobUpdateController updater;
 
   private EventBus eventBus;
 
   @Before
   public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
     updater = createMock(JobUpdateController.class);
 
     eventBus = new EventBus();
-    eventBus.register(new JobUpdateEventSubscriber(updater, storageUtil.storage));
+    eventBus.register(new JobUpdateEventSubscriber(updater));
   }
 
   @Test
   public void testStateChange() throws Exception {
-    updater.instanceChangedState(INSTANCE_A);
+    updater.instanceChangedState(TASK_A);
 
     control.replay();
 
@@ -79,22 +77,16 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
 
   @Test
   public void testDeleted() throws Exception {
-    updater.instanceChangedState(INSTANCE_A);
+    updater.instanceDeleted(INSTANCE_A);
 
     control.replay();
 
     eventBus.post(new TasksDeleted(ImmutableSet.of(TASK_A)));
   }
 
-  private static final IJobUpdateSummary SUMMARY = IJobUpdateSummary.build(new JobUpdateSummary()
-      .setJobKey(JOB_A.newBuilder()));
-
   @Test
   public void testSchedulerStartup() throws Exception {
-    expect(storageUtil.jobUpdateStore.fetchJobUpdateSummaries(
-        JobUpdateEventSubscriber.ACTIVE_QUERY)).andReturn(ImmutableList.of(SUMMARY));
-
-    updater.systemResume(JOB_A);
+    updater.systemResume();
 
     control.replay();
 
@@ -102,12 +94,27 @@ public class JobUpdateEventSubscriberTest extends EasyMockTest {
   }
 
   @Test
-  public void testSchedulerStartupNoUpdates() throws Exception {
-    expect(storageUtil.jobUpdateStore.fetchJobUpdateSummaries(
-        JobUpdateEventSubscriber.ACTIVE_QUERY)).andReturn(ImmutableList.<IJobUpdateSummary>of());
+  public void testHandlesExceptions() throws Exception {
+    updater.systemResume();
+    expectLastCall().andThrow(new RuntimeException());
+    updater.instanceChangedState(TASK_A);
+    expectLastCall().andThrow(new RuntimeException());
+    updater.instanceDeleted(INSTANCE_A);
+    expectLastCall().andThrow(new RuntimeException());
 
     control.replay();
 
     eventBus.post(new SchedulerActive());
+    eventBus.post(TaskStateChange.initialized(TASK_A));
+    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK_A)));
+  }
+
+  @Test
+  public void testIgnoresPrunedTasks() throws Exception {
+    control.replay();
+
+    IScheduledTask task =
+        IScheduledTask.build(TASK_A.newBuilder().setStatus(ScheduleStatus.FAILED));
+    eventBus.post(new TasksDeleted(ImmutableSet.of(task)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
index 6eed641..7e41843 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachineTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
@@ -49,6 +50,7 @@ public class JobUpdateStateMachineTest {
           .put(Pair.of(ROLLING_BACK, ROLLED_BACK), STOP_WATCHING)
           .put(Pair.of(ROLLING_BACK, ABORTED), STOP_WATCHING)
           .put(Pair.of(ROLLING_BACK, ERROR), STOP_WATCHING)
+          .put(Pair.of(ROLLING_BACK, FAILED), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_PAUSED, ROLLING_FORWARD), ROLL_FORWARD)
           .put(Pair.of(ROLL_FORWARD_PAUSED, ABORTED), STOP_WATCHING)
           .put(Pair.of(ROLL_FORWARD_PAUSED, ERROR), STOP_WATCHING)
@@ -64,7 +66,8 @@ public class JobUpdateStateMachineTest {
         Pair<JobUpdateStatus, JobUpdateStatus> key = Pair.of(from, to);
         MonitorAction expected = EXPECTED.get(key);
         try {
-          MonitorAction actual = JobUpdateStateMachine.transition(from, to);
+          JobUpdateStateMachine.assertTransitionAllowed(from, to);
+          MonitorAction actual = JobUpdateStateMachine.getActionForStatus(to);
           if (expected == null) {
             fail("Transition " + key + " should have been disallowed, but got result " + actual);
           }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
new file mode 100644
index 0000000..0e15a79
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -0,0 +1,753 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.twitter.common.inject.Bindings;
+import com.twitter.common.inject.Bindings.KeyFactory;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.TruncatedBinaryBackoff;
+
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceTaskConfig;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateConfiguration;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.gen.Range;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.Driver;
+import org.apache.aurora.scheduler.TaskIdGenerator;
+import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import org.apache.aurora.scheduler.async.RescheduleCalculator;
+import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.state.LockManager;
+import org.apache.aurora.scheduler.state.LockManagerImpl;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.StateManagerImpl;
+import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.db.DbModule;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorage.Delegated;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
+import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED;
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static org.apache.aurora.scheduler.updater.JobUpdateControllerImpl.queryByUpdateId;
+import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JobUpdaterIT extends EasyMockTest {
+
+  private static final String UPDATE_ID = "update_id";
+  private static final String USER = "user";
+  private static final IJobKey JOB = JobKeys.from("role", "env", "job1");
+  private static final Amount<Long, Time> RUNNING_TIMEOUT = Amount.of(1000L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> WATCH_TIMEOUT = Amount.of(2000L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MILLISECONDS);
+  private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
+  private static final ITaskConfig OLD_CONFIG =
+      ITaskConfig.build(makeTaskConfig().setExecutorConfig(new ExecutorConfig().setName("new")));
+  private static final ITaskConfig NEW_CONFIG = ITaskConfig.build(makeTaskConfig());
+
+  private FakeScheduledExecutor clock;
+  private JobUpdateController updater;
+  private Driver driver;
+  private EventBus eventBus;
+  private Storage storage;
+  private LockManager lockManager;
+  private StateManager stateManager;
+  private JobUpdateEventSubscriber subscriber;
+
+  @Before
+  public void setUp() {
+    // Avoid console spam due to stats registered multiple times.
+    Stats.flush();
+    final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+    clock = new FakeScheduledExecutor(executor);
+    driver = createMock(Driver.class);
+    eventBus = new EventBus();
+
+    Injector injector = Guice.createInjector(
+        new UpdaterModule(executor),
+        DbModule.testModule(Bindings.annotatedKeyFactory(Delegated.class)),
+        new MemStorageModule(KeyFactory.PLAIN),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Clock.class).toInstance(clock);
+            bind(StateManager.class).to(StateManagerImpl.class);
+            bind(Driver.class).toInstance(driver);
+            bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
+            bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class);
+            bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+                .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+                    new TruncatedBinaryBackoff(
+                        Amount.of(1L, Time.SECONDS), Amount.of(1L, Time.MINUTES)),
+                    FLAPPING_THRESHOLD,
+                    Amount.of(1, Time.MINUTES)));
+            bind(EventSink.class).toInstance(new EventSink() {
+              @Override
+              public void post(PubsubEvent event) {
+                eventBus.post(event);
+              }
+            });
+            bind(LockManager.class).to(LockManagerImpl.class);
+            bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
+          }
+        });
+    updater = injector.getInstance(JobUpdateController.class);
+    storage = injector.getInstance(Storage.class);
+    storage.prepare();
+    lockManager = injector.getInstance(LockManager.class);
+    stateManager = injector.getInstance(StateManager.class);
+    eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class));
+    subscriber = injector.getInstance(JobUpdateEventSubscriber.class);
+  }
+
+  @After
+  public void validateExitState() {
+    clock.assertEmpty();
+    assertEquals(ImmutableList.<ILock>of(), ImmutableList.copyOf(lockManager.getLocks()));
+  }
+
+  @Test(expected = UpdateStateException.class)
+  public void testJobLocked() throws Exception {
+    control.replay();
+
+    ILock lock = lockManager.acquireLock(ILockKey.build(LockKey.job(JOB.newBuilder())), USER);
+    try {
+      updater.start(makeJobUpdate(makeInstanceConfig(0, 0, NEW_CONFIG)), USER);
+    } finally {
+      lockManager.releaseLock(lock);
+    }
+  }
+
+  private String getTaskId(IJobKey job, int instanceId) {
+    return Tasks.id(Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(
+            storage,
+            Query.instanceScoped(job, instanceId).active())));
+  }
+
+  private void changeState(
+      IJobKey job,
+      int instanceId,
+      ScheduleStatus status,
+      ScheduleStatus... statuses) {
+
+    for (ScheduleStatus s
+        : ImmutableList.<ScheduleStatus>builder().add(status).add(statuses).build()) {
+
+      assertTrue(stateManager.changeState(
+          getTaskId(job, instanceId),
+          Optional.<ScheduleStatus>absent(),
+          s,
+          Optional.<String>absent()));
+    }
+  }
+
+  private void assertUpdateStatus(JobUpdateStatus expected) {
+    JobUpdateStatus status = storage.consistentRead(new Work.Quiet<JobUpdateStatus>() {
+      @Override
+      public JobUpdateStatus apply(StoreProvider storeProvider) {
+        IJobUpdateSummary summary = Iterables.getOnlyElement(
+            storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByUpdateId(UPDATE_ID)));
+        return summary.getState().getStatus();
+      }
+    });
+    assertEquals(expected, status);
+  }
+
+  private IExpectationSetters<String> expectTaskKilled() {
+    driver.killTask(EasyMock.<String>anyObject());
+    return expectLastCall();
+  }
+
+  private void insertInitialTasks(IJobUpdate update) {
+    for (IInstanceTaskConfig config : update.getConfiguration().getOldTaskConfigs()) {
+      stateManager.insertPendingTasks(config.getTask(), expandInstanceIds(ImmutableSet.of(config)));
+    }
+  }
+
+  private void assertJobState(IJobKey job, Map<Integer, ITaskConfig> expected) {
+    Iterable<IScheduledTask> tasks =
+        Storage.Util.consistentFetchTasks(storage, Query.jobScoped(job).active());
+
+    Map<Integer, IScheduledTask> tasksByInstance =
+        Maps.uniqueIndex(tasks, Tasks.SCHEDULED_TO_INSTANCE_ID);
+    assertEquals(
+        expected,
+        ImmutableMap.copyOf(Maps.transformValues(tasksByInstance, Tasks.SCHEDULED_TO_INFO)));
+  }
+
+  @Test
+  public void testSuccessfulUpdate() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    IJobUpdate update = makeJobUpdate(
+        // No-op - task is already matching the new config.
+        makeInstanceConfig(0, 0, NEW_CONFIG),
+        // Task needing update.
+        makeInstanceConfig(2, 2, OLD_CONFIG));
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is added
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLING_FORWARD);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+
+    // Updates may be paused for arbitrarily-long amounts of time, and the updater should not
+    // take action while paused.
+    updater.pause(JOB);
+    updater.pause(JOB);  // Pausing again is a no-op.
+    assertUpdateStatus(ROLL_FORWARD_PAUSED);
+    clock.advance(ONE_DAY);
+    changeState(JOB, 1, FAILED, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, FAILED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+    updater.resume(JOB);
+    assertUpdateStatus(ROLLING_FORWARD);
+
+    // A task outside the scope of the update should be ignored by the updater.
+    stateManager.insertPendingTasks(NEW_CONFIG, ImmutableSet.of(100));
+
+    // Instance 2 is updated
+    changeState(JOB, 2, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ROLLED_FORWARD);
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, NEW_CONFIG, 1, NEW_CONFIG, 2, NEW_CONFIG, 100, NEW_CONFIG));
+  }
+
+  @Test
+  public void testUpdateSpecificInstances() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    JobUpdate builder =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 1).newBuilder();
+    builder.getConfiguration().getSettings().setUpdateOnlyTheseInstances(
+        ImmutableSet.of(new Range(0, 0)));
+    IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated
+    updater.start(update, USER);
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ROLLED_FORWARD);
+    assertJobState(
+        JOB,
+        ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG));
+  }
+
+  @Test
+  public void testRollback() throws Exception {
+    expectTaskKilled().times(4);
+
+    control.replay();
+
+    IJobUpdate update = makeJobUpdate(
+            makeInstanceConfig(0, 0, OLD_CONFIG),
+            makeInstanceConfig(2, 3, OLD_CONFIG));
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 3, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated.
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLING_FORWARD);
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is added.
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 2 is updated, but fails.
+    changeState(JOB, 2, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(FLAPPING_THRESHOLD);
+    changeState(JOB, 2, FAILED);
+
+    // Instance 2 is rolled back.
+    assertUpdateStatus(ROLLING_BACK);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // A rollback may be paused.
+    updater.pause(JOB);
+    assertUpdateStatus(ROLL_BACK_PAUSED);
+    clock.advance(ONE_DAY);
+    updater.resume(JOB);
+    assertUpdateStatus(ROLLING_BACK);
+
+    // Instance 1 is removed.
+    changeState(JOB, 1, KILLED);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is rolled back.
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ROLLED_BACK);
+    assertJobState(JOB, ImmutableMap.of(0, OLD_CONFIG, 2, OLD_CONFIG, 3, OLD_CONFIG));
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    IJobUpdate update = makeJobUpdate(makeInstanceConfig(0, 2, OLD_CONFIG));
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated
+    updater.start(update, USER);
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    updater.abort(JOB);
+    assertUpdateStatus(ABORTED);
+    clock.advance(WATCH_TIMEOUT);
+    assertJobState(JOB, ImmutableMap.of(0, NEW_CONFIG, 1, NEW_CONFIG, 2, OLD_CONFIG));
+  }
+
+  @Test
+  public void testRollbackFailed() throws Exception {
+    expectTaskKilled().times(2);
+
+    control.replay();
+
+    IJobUpdate update = makeJobUpdate(
+        makeInstanceConfig(0, 1, OLD_CONFIG));
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated.
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLING_FORWARD);
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is updated, but fails.
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(FLAPPING_THRESHOLD);
+    changeState(JOB, 1, FAILED);
+
+    // Instance 1 is rolled back, but fails.
+    assertUpdateStatus(ROLLING_BACK);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(FLAPPING_THRESHOLD);
+    changeState(JOB, 1, FAILED);
+
+    assertUpdateStatus(JobUpdateStatus.FAILED);
+    clock.advance(WATCH_TIMEOUT);
+    assertJobState(JOB, ImmutableMap.of(0, NEW_CONFIG, 1, OLD_CONFIG));
+  }
+
+  @Test
+  public void testLostLock() throws Exception {
+    expectTaskKilled();
+
+    control.replay();
+
+    IJobUpdate update = makeJobUpdate(
+        makeInstanceConfig(0, 1, OLD_CONFIG));
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated.
+    updater.start(update, USER);
+    for (ILock lock : lockManager.getLocks()) {
+      lockManager.releaseLock(lock);
+    }
+    clock.advance(RUNNING_TIMEOUT);
+    assertUpdateStatus(ERROR);
+  }
+
+  private void expectInvalid(JobUpdate update)
+      throws UpdateStateException, UpdateConfigurationException {
+
+    try {
+      updater.start(IJobUpdate.build(update), USER);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testStartInvalidUpdate() throws Exception {
+    control.replay();
+
+    JobUpdate update = makeJobUpdate().newBuilder();
+    update.getConfiguration().getSettings().setUpdateGroupSize(-1);
+    expectInvalid(update);
+
+    update = makeJobUpdate().newBuilder();
+    update.getConfiguration().getSettings().setMaxWaitToInstanceRunningMs(0);
+    expectInvalid(update);
+
+    update = makeJobUpdate().newBuilder();
+    update.getConfiguration().getSettings().setMinWaitInInstanceRunningMs(0);
+    expectInvalid(update);
+
+    update = makeJobUpdate().newBuilder();
+    update.getConfiguration().setInstanceCount(0);
+    expectInvalid(update);
+
+    update = makeJobUpdate().newBuilder();
+    update.getConfiguration().getSettings().addToUpdateOnlyTheseInstances(new Range(0, 100));
+    try {
+      updater.start(IJobUpdate.build(update), USER);
+      fail();
+    } catch (UpdateConfigurationException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testConfigurationPolicyChange() throws Exception {
+    // Simulates a change in input validation after a job update has been persisted.
+
+    expectTaskKilled().times(2);
+
+    control.replay();
+
+    final IJobUpdate update =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 2);
+    insertInitialTasks(update);
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is updated
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLING_FORWARD);
+
+    storage.write(new NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        JobUpdateStore.Mutable store = storeProvider.getJobUpdateStore();
+        store.deleteAllUpdatesAndEvents();
+
+        JobUpdate builder = update.newBuilder();
+        builder.getConfiguration().setInstanceCount(0);
+        for (ILock lock : lockManager.getLocks()) {
+          lockManager.releaseLock(lock);
+        }
+        saveJobUpdate(store, IJobUpdate.build(builder), ROLLING_FORWARD);
+      }
+    });
+
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is updated, but fails.
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING, FAILED);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ERROR);
+  }
+
+  private ILock saveJobUpdate(
+      JobUpdateStore.Mutable store,
+      IJobUpdate update,
+      JobUpdateStatus status) {
+
+    ILock lock;
+    try {
+      lock = lockManager.acquireLock(
+          ILockKey.build(LockKey.job(update.getSummary().getJobKey().newBuilder())), USER);
+    } catch (LockManager.LockException e) {
+      throw Throwables.propagate(e);
+    }
+
+    store.saveJobUpdate(update, lock.getToken());
+    store.saveJobUpdateEvent(
+        IJobUpdateEvent.build(
+            new JobUpdateEvent()
+                .setStatus(status)
+                .setTimestampMs(clock.nowMillis())),
+        update.getSummary().getUpdateId());
+    return lock;
+  }
+
+  @Test
+  public void testRecoverFromStorage() throws Exception {
+    expectTaskKilled().times(2);
+
+    control.replay();
+
+    final IJobUpdate update =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 2);
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(ONE_DAY);
+
+    storage.write(new NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD);
+      }
+    });
+
+    eventBus.post(new SchedulerActive());
+
+    // Instance 0 is updated.
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is updated.
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ROLLED_FORWARD);
+  }
+
+  @Test
+  public void testSystemResumeNoLock() throws Exception {
+    control.replay();
+
+    final IJobUpdate update =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 0);
+
+    storage.write(new NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        ILock lock = saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD);
+        lockManager.releaseLock(lock);
+      }
+    });
+
+    eventBus.post(new SchedulerActive());
+    assertUpdateStatus(ERROR);
+  }
+
+  @Test
+  public void testNoopUpdate() throws Exception {
+    control.replay();
+
+    final IJobUpdate update = makeJobUpdate(makeInstanceConfig(0, 2, NEW_CONFIG));
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(ONE_DAY);
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLED_FORWARD);
+  }
+
+  @Test
+  public void testImmediatelyFailedUpdate() throws Exception {
+    // An update where the new configuration is running, but entirely stuck in pending.
+    control.replay();
+
+    final IJobUpdate update = makeJobUpdate(makeInstanceConfig(0, 2, NEW_CONFIG));
+    insertInitialTasks(update);
+    clock.advance(ONE_DAY);
+    updater.start(update, USER);
+    assertUpdateStatus(JobUpdateStatus.FAILED);
+  }
+
+  @Test
+  public void testStuckTask() throws Exception {
+    expectTaskKilled().times(3);
+
+    control.replay();
+
+    IJobUpdate update = setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 1, OLD_CONFIG)), 2);
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+
+    // Instance 0 is updated.
+    updater.start(update, USER);
+    assertUpdateStatus(ROLLING_FORWARD);
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 1 is stuck in PENDING.
+    changeState(JOB, 1, KILLED);
+    clock.advance(RUNNING_TIMEOUT);
+    assertUpdateStatus(ROLLING_BACK);
+
+    // Instance 1 is reverted.
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    // Instance 0 is reverted.
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    assertUpdateStatus(ROLLED_BACK);
+    assertJobState(JOB, ImmutableMap.of(0, OLD_CONFIG, 1, OLD_CONFIG));
+  }
+
+  @Test
+  public void testBadPubsubUpdate() {
+    control.replay();
+
+    subscriber.taskChangedState(
+        PubsubEvent.TaskStateChange.transition(IScheduledTask.build(new ScheduledTask()), RUNNING));
+  }
+
+  @Test(expected = UpdateStateException.class)
+  public void testPauseUnknownUpdate() throws Exception {
+    control.replay();
+
+    updater.pause(JOB);
+  }
+
+  private static IJobUpdateSummary makeUpdateSummary() {
+    return IJobUpdateSummary.build(new JobUpdateSummary()
+        .setUser("user")
+        .setJobKey(JOB.newBuilder())
+        .setUpdateId(UPDATE_ID));
+  }
+
+  private static IJobUpdate makeJobUpdate(IInstanceTaskConfig... configs) {
+    JobUpdate builder = new JobUpdate()
+        .setSummary(makeUpdateSummary().newBuilder())
+        .setConfiguration(new JobUpdateConfiguration()
+            .setNewTaskConfig(NEW_CONFIG.newBuilder())
+            .setInstanceCount(3)
+            .setSettings(new JobUpdateSettings()
+                .setUpdateGroupSize(1)
+                .setMaxWaitToInstanceRunningMs(RUNNING_TIMEOUT.as(Time.MILLISECONDS).intValue())
+                .setMinWaitInInstanceRunningMs(WATCH_TIMEOUT.as(Time.MILLISECONDS).intValue())
+                .setUpdateOnlyTheseInstances(ImmutableSet.<Range>of())));
+
+    for (IInstanceTaskConfig config : configs) {
+      builder.getConfiguration().addToOldTaskConfigs(config.newBuilder());
+    }
+
+    return IJobUpdate.build(builder);
+  }
+
+  private static IJobUpdate setInstanceCount(IJobUpdate update, int instanceCount) {
+    JobUpdate builder = update.newBuilder();
+    builder.getConfiguration().setInstanceCount(instanceCount);
+    return IJobUpdate.build(builder);
+  }
+
+  private static IInstanceTaskConfig makeInstanceConfig(int start, int end, ITaskConfig config) {
+    return IInstanceTaskConfig.build(new InstanceTaskConfig()
+        .setInstances(ImmutableSet.of(new Range(start, end)))
+        .setTask(config.newBuilder()));
+  }
+
+  private static TaskConfig makeTaskConfig() {
+    return new TaskConfig()
+        .setJobName(JOB.getName())
+        .setEnvironment(JOB.getEnvironment())
+        .setOwner(new Identity(JOB.getRole(), "user"))
+        .setIsService(true)
+        .setExecutorConfig(new ExecutorConfig().setName("old"))
+        .setNumCpus(1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdateControllerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdateControllerTest.java b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdateControllerTest.java
deleted file mode 100644
index f0b6835..0000000
--- a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdateControllerTest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.updater;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
-import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.InstanceAction;
-import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.FAILED;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.SUCCEEDED;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class OneWayJobUpdateControllerTest extends EasyMockTest {
-  private static final Set<Integer> EMPTY = ImmutableSet.of();
-  private static final Map<Integer, InstanceAction> NO_ACTIONS = ImmutableMap.of();
-
-  private UpdateStrategy<Integer> strategy;
-  private StateEvaluator<String> instance0;
-  private StateEvaluator<String> instance1;
-  private StateEvaluator<String> instance2;
-  private StateEvaluator<String> instance3;
-  private Map<Integer, StateEvaluator<String>> allInstances;
-  private InstanceStateProvider<Integer, String> stateProvider;
-
-  private OneWayJobUpdater<Integer, String> jobUpdater;
-
-  @Before
-  public void setUp() {
-    strategy = createMock(new Clazz<UpdateStrategy<Integer>>() { });
-    instance0 = createMock(new Clazz<StateEvaluator<String>>() { });
-    instance1 = createMock(new Clazz<StateEvaluator<String>>() { });
-    instance2 = createMock(new Clazz<StateEvaluator<String>>() { });
-    instance3 = createMock(new Clazz<StateEvaluator<String>>() { });
-    allInstances = ImmutableMap.of(
-        0, instance0,
-        1, instance1,
-        2, instance2,
-        3, instance3);
-    stateProvider = createMock(new Clazz<InstanceStateProvider<Integer, String>>() { });
-  }
-
-  private void evaluate(OneWayStatus expectedStatus, Map<Integer, InstanceAction> expectedActions) {
-    assertEquals(
-        new EvaluationResult<>(expectedStatus, expectedActions),
-        jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider));
-  }
-
-  private void evaluate(
-      int instanceId,
-      OneWayStatus expectedStatus,
-      Map<Integer, InstanceAction> expectedActions) {
-
-    assertEquals(
-        new EvaluationResult<>(expectedStatus, expectedActions),
-        jobUpdater.evaluate(ImmutableSet.of(instanceId), stateProvider));
-  }
-
-  private void expectEvaluate(
-      int instanceId,
-      StateEvaluator<String> instanceMock,
-      String state,
-      Result result) {
-
-    expect(stateProvider.getState(instanceId)).andReturn(state);
-    expect(instanceMock.evaluate(state)).andReturn(result);
-  }
-
-  @Test
-  public void testSuccessfulUpdate() {
-    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
-        .andReturn(ImmutableSet.of(0, 2));
-    String s0 = "0";
-    String s1 = "1";
-    String s2 = "2";
-    String s3 = "3";
-    expectEvaluate(
-        0,
-        instance0,
-        s0,
-        KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
-    expectEvaluate(
-        2,
-        instance2,
-        s2,
-        REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
-
-    expectEvaluate(0, instance0, s0, EVALUATE_ON_STATE_CHANGE);
-    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(0, 2))).andReturn(EMPTY);
-    expectEvaluate(0, instance0, s0, SUCCEEDED);
-    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), ImmutableSet.of(2))).andReturn(EMPTY);
-    expectEvaluate(2, instance2, s2, SUCCEEDED);
-    expect(strategy.getNextGroup(ImmutableSet.of(1, 3), EMPTY))
-        .andReturn(ImmutableSet.of(1, 3));
-    expectEvaluate(
-        1,
-        instance1,
-        s1,
-        SUCCEEDED);
-    expectEvaluate(
-        3,
-        instance3,
-        s3,
-        EVALUATE_AFTER_MIN_RUNNING_MS);
-    expectEvaluate(3, instance3, s3, SUCCEEDED);
-
-    control.replay();
-
-    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
-
-    evaluate(
-        OneWayStatus.WORKING,
-        ImmutableMap.of(
-            0, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-            2, InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE));
-    evaluate(
-        0,
-        OneWayStatus.WORKING,
-        ImmutableMap.of(0, InstanceAction.EVALUATE_ON_STATE_CHANGE));
-    evaluate(
-        0,
-        OneWayStatus.WORKING,
-        NO_ACTIONS);
-    evaluate(
-        2,
-        OneWayStatus.WORKING,
-        ImmutableMap.of(
-            3, InstanceAction.EVALUATE_AFTER_MIN_RUNNING_MS));
-    evaluate(
-        3,
-        OneWayStatus.SUCCEEDED,
-        NO_ACTIONS);
-  }
-
-  @Test
-  public void testFailedUpdate() {
-    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
-        .andReturn(ImmutableSet.of(0, 1));
-    String s0 = "0";
-    String s1 = "1";
-    expectEvaluate(
-        0,
-        instance0,
-        s0,
-        FAILED);
-    expectEvaluate(
-        1,
-        instance1,
-        s1,
-        KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
-
-    control.replay();
-
-    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
-
-    evaluate(
-        OneWayStatus.FAILED,
-        ImmutableMap.of(
-            1, InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE));
-
-    // The updater should now reject further attempts to evaluate.
-    try {
-      jobUpdater.evaluate(ImmutableSet.<Integer>of(), stateProvider);
-      fail();
-    } catch (IllegalStateException e) {
-      // Expected.
-    }
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBadInput() {
-    control.replay();
-
-    new OneWayJobUpdater<>(strategy, 0, ImmutableMap.<Integer, StateEvaluator<String>>of());
-  }
-
-  @Test
-  public void testEvaluateCompletedInstance() {
-    expect(strategy.getNextGroup(ImmutableSet.of(0, 1, 2, 3), EMPTY))
-        .andReturn(ImmutableSet.of(0));
-    expect(strategy.getNextGroup(ImmutableSet.of(1, 2, 3), EMPTY))
-        .andReturn(ImmutableSet.<Integer>of());
-    String s0 = "0";
-    expectEvaluate(
-        0,
-        instance0,
-        s0,
-        SUCCEEDED);
-
-    control.replay();
-
-    jobUpdater = new OneWayJobUpdater<>(strategy, 0, allInstances);
-
-    evaluate(
-        OneWayStatus.WORKING,
-        NO_ACTIONS);
-
-    // Instance 0 is already considered finished, so any further notifications of its state will
-    // no-op.
-    evaluate(
-        0,
-        OneWayStatus.WORKING,
-        NO_ACTIONS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactoryImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactoryImplTest.java
deleted file mode 100644
index ae65462..0000000
--- a/src/test/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactoryImplTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.updater;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobUpdateConfiguration;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.scheduler.updater.OneWayJobUpdaterFactory.UpdateConfigurationException;
-import static org.junit.Assert.assertEquals;
-
-/**
- * This test can't exercise much functionality of the output from the factory without duplicating
- * test behavior in the job updater integration test.  So instead, we test only some basic behavior.
- */
-public class OneWayJobUpdaterFactoryImplTest {
-
-  private static final IJobUpdateConfiguration CONFIG = IJobUpdateConfiguration.build(
-      new JobUpdateConfiguration()
-          .setNewTaskConfig(new TaskConfig())
-          .setInstanceCount(3)
-          .setOldTaskConfigs(ImmutableSet.of(new InstanceTaskConfig()
-              .setInstances(ImmutableSet.of(new Range(1, 2)))
-              .setTask(new TaskConfig())))
-          .setSettings(new JobUpdateSettings()
-              .setMaxFailedInstances(1)
-              .setMaxPerInstanceFailures(1)
-              .setMaxWaitToInstanceRunningMs(100)
-              .setMinWaitInInstanceRunningMs(100)
-              .setUpdateGroupSize(2)
-              .setUpdateOnlyTheseInstances(ImmutableSet.<Range>of())));
-
-  private OneWayJobUpdaterFactory factory;
-
-  @Before
-  public void setUp() {
-    factory = new OneWayJobUpdaterFactory.OneWayJobUpdaterFactoryImpl(new FakeClock());
-  }
-
-  @Test
-  public void testRollingForward() throws Exception  {
-    OneWayJobUpdater<Integer, Optional<IScheduledTask>> update = factory.newUpdate(CONFIG, true);
-    assertEquals(ImmutableSet.of(0, 1, 2), update.getInstances());
-  }
-
-  @Test
-  public void testRollingBack() throws Exception {
-    OneWayJobUpdater<Integer, Optional<IScheduledTask>> update = factory.newUpdate(CONFIG, false);
-    assertEquals(ImmutableSet.of(0, 1, 2), update.getInstances());
-  }
-
-  @Test
-  public void testRollForwardSpecificInstances() throws Exception {
-    JobUpdateConfiguration config = CONFIG.newBuilder();
-    config.getSettings().setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(1, 2)));
-
-    OneWayJobUpdater<Integer, Optional<IScheduledTask>> update =
-        factory.newUpdate(IJobUpdateConfiguration.build(config), true);
-    assertEquals(ImmutableSet.of(1, 2), update.getInstances());
-  }
-
-  @Test
-  public void testRollBackSpecificInstances() throws Exception {
-    JobUpdateConfiguration config = CONFIG.newBuilder();
-    config.getSettings().setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(1, 2)));
-
-    OneWayJobUpdater<Integer, Optional<IScheduledTask>> update =
-        factory.newUpdate(IJobUpdateConfiguration.build(config), false);
-    assertEquals(ImmutableSet.of(1, 2), update.getInstances());
-  }
-
-  @Test(expected = UpdateConfigurationException.class)
-  public void testInvalidConfiguration() throws Exception {
-    JobUpdateConfiguration config = CONFIG.newBuilder();
-    config.getSettings().setUpdateOnlyTheseInstances(ImmutableSet.of(new Range(10, 10)));
-
-    factory.newUpdate(IJobUpdateConfiguration.build(config), true);
-  }
-
-  @Test
-  public void testUpdateRemovesInstance() throws Exception {
-    JobUpdateConfiguration config = CONFIG.newBuilder();
-    config.setInstanceCount(2);
-
-    OneWayJobUpdater<Integer, Optional<IScheduledTask>> update =
-        factory.newUpdate(IJobUpdateConfiguration.build(config), true);
-    assertEquals(ImmutableSet.of(0, 1, 2), update.getInstances());
-  }
-}


Mime
View raw message