aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [3/3] git commit: Add a controller for job updates.
Date Tue, 16 Sep 2014 21:31:59 GMT
Add a controller for job updates.

Bugs closed: AURORA-613

Reviewed at https://reviews.apache.org/r/25529/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/406d0fb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/406d0fb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/406d0fb7

Branch: refs/heads/master
Commit: 406d0fb769f70eba697123778838079bc15acea4
Parents: c8ce580
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Sep 16 14:28:48 2014 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Sep 16 14:28:48 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 -
 .../scheduler/async/RescheduleCalculator.java   |   6 +-
 .../aurora/scheduler/base/InstanceKeys.java     |  54 ++
 .../org/apache/aurora/scheduler/base/Jobs.java  |   7 +-
 .../org/apache/aurora/scheduler/base/Query.java |   6 +-
 .../org/apache/aurora/scheduler/base/Tasks.java |  12 -
 .../aurora/scheduler/state/LockManagerImpl.java |   4 +-
 .../scheduler/storage/ForwardingStore.java      |   4 +-
 .../scheduler/storage/JobUpdateStore.java       |   6 +-
 .../scheduler/storage/db/DBJobUpdateStore.java  |   4 +-
 .../thrift/SchedulerThriftInterface.java        |  19 +-
 .../scheduler/updater/InstanceAction.java       |  37 +
 .../updater/InstanceActionHandler.java          | 130 ++++
 .../scheduler/updater/InstanceUpdater.java      |  21 +-
 .../scheduler/updater/JobUpdateController.java  |  27 +-
 .../updater/JobUpdateControllerImpl.java        | 472 +++++++++++-
 .../updater/JobUpdateEventSubscriber.java       |  91 +--
 .../updater/JobUpdateStateMachine.java          |  59 +-
 .../scheduler/updater/OneWayJobUpdater.java     | 178 +++--
 .../updater/OneWayJobUpdaterFactory.java        | 176 -----
 .../updater/UpdateConfigurationException.java   |  23 +
 .../aurora/scheduler/updater/UpdateFactory.java | 225 ++++++
 .../aurora/scheduler/updater/UpdaterModule.java |  38 +-
 .../updater/strategy/ActiveLimitedStrategy.java |   7 +-
 .../updater/strategy/BatchStrategy.java         |   5 +-
 .../updater/strategy/QueueStrategy.java         |   6 +-
 .../thrift/org/apache/aurora/gen/api.thrift     |   7 +
 .../storage/db/DBJobUpdateStoreTest.java        |  30 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |  26 +-
 .../aurora/scheduler/updater/AddTaskTest.java   |  61 ++
 .../aurora/scheduler/updater/EnumsTest.java     |  62 ++
 .../updater/FakeScheduledExecutor.java          |  87 +++
 .../scheduler/updater/InstanceUpdaterTest.java  |   4 +-
 .../updater/JobUpdateEventSubscriberTest.java   |  65 +-
 .../updater/JobUpdateStateMachineTest.java      |   5 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 753 +++++++++++++++++++
 .../updater/OneWayJobUpdateControllerTest.java  | 232 ------
 .../OneWayJobUpdaterFactoryImplTest.java        | 110 ---
 .../scheduler/updater/OneWayJobUpdaterTest.java | 268 +++++++
 .../updater/UpdateFactoryImplTest.java          | 105 +++
 .../updater/strategy/BatchStrategyTest.java     |  15 +-
 .../updater/strategy/QueueStrategyTest.java     |  17 +-
 42 files changed, 2656 insertions(+), 809 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e92b27d..e519121 100644
--- a/build.gradle
+++ b/build.gradle
@@ -259,7 +259,6 @@ def codeQualityTasks = [
     Checkstyle,
     FindBugs,
     nl.javadude.gradle.plugins.license.License,
-    JacocoReport,
     Pmd
 ]
 codeQualityTasks.each {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index aaa3513..ca54c9a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -20,6 +20,7 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -112,12 +113,13 @@ public interface RescheduleCalculator {
       }
     };
 
-    static class RescheduleCalculatorSettings {
+    @VisibleForTesting
+    public static class RescheduleCalculatorSettings {
       private final BackoffStrategy flappingTaskBackoff;
       private final Amount<Long, Time> flappingTaskThreashold;
       private final Amount<Integer, Time>  maxStartupRescheduleDelay;
 
-      RescheduleCalculatorSettings(
+      public RescheduleCalculatorSettings(
           BackoffStrategy flappingTaskBackoff,
           Amount<Long, Time> flappingTaskThreashold,
           Amount<Integer, Time> maxStartupRescheduleDelay) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/base/InstanceKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/InstanceKeys.java b/src/main/java/org/apache/aurora/scheduler/base/InstanceKeys.java
new file mode 100644
index 0000000..b12ac83
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/InstanceKeys.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.base;
+
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Utility function for {@link IInstanceKey instance keys}.
+ */
+public final class InstanceKeys {
+  private InstanceKeys() {
+    // Utility class.
+  }
+
+  /**
+   * Creates an instance key from a job and instance ID.
+   *
+   * @param job Job key.
+   * @param instanceId Instance id.
+   * @return Instance ID.
+   */
+  public static IInstanceKey from(IJobKey job, int instanceId) {
+    Objects.requireNonNull(job);
+    Preconditions.checkArgument(instanceId >= 0);
+    return IInstanceKey.build(new InstanceKey(job.newBuilder(), instanceId));
+  }
+
+  /**
+   * Creates a human-friendly string for an instance key.
+   *
+   * @param instance Instance key.
+   * @return String representation of the instance key.
+   */
+  public static String toString(IInstanceKey instance) {
+    return JobKeys.canonicalString(instance.getJobKey()) + "/" + instance.getInstanceId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
index 744d2b6..9ba83fa 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
@@ -17,6 +17,7 @@ import org.apache.aurora.gen.JobStats;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.storage.entities.IJobStats;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 
 /**
  * Convenience methods related to jobs.
@@ -27,6 +28,10 @@ public final class Jobs {
     // Utility class.
   }
 
+  private static ITaskEvent getSecondToLatestEvent(IScheduledTask task) {
+    return task.getTaskEvents().get(task.getTaskEvents().size() - 2);
+  }
+
   /**
    * For a given collection of tasks compute statistics based on the state of the task.
    *
@@ -39,7 +44,7 @@ public final class Jobs {
       ScheduleStatus status = task.getStatus();
       if (status == ScheduleStatus.SANDBOX_DELETED) {
         // SANDBOX_DELETED must be preceded by the real terminal state.
-        updateStats(stats, Tasks.getSecondToLatestEvent(task).getStatus());
+        updateStats(stats, getSecondToLatestEvent(task).getStatus());
       } else {
         updateStats(stats, status);
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Query.java b/src/main/java/org/apache/aurora/scheduler/base/Query.java
index d8572bb..eded7a5 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Query.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Query.java
@@ -24,9 +24,9 @@ import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 
 import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 
 import static java.util.Objects.requireNonNull;
@@ -96,8 +96,8 @@ public final class Query {
     return unscoped().byJobKeys(jobKeys);
   }
 
-  public static Builder instanceScoped(InstanceKey instanceKey) {
-    return instanceScoped(IJobKey.build(instanceKey.getJobKey()), instanceKey.getInstanceId());
+  public static Builder instanceScoped(IInstanceKey instanceKey) {
+    return instanceScoped(instanceKey.getJobKey(), instanceKey.getInstanceId());
   }
 
   public static Builder instanceScoped(IJobKey jobKey, int instanceId, int... instanceIds) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 1be1637..9664c3b 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -211,14 +211,6 @@ public final class Tasks {
     return Maps.uniqueIndex(tasks, SCHEDULED_TO_ID);
   }
 
-  public static String getRole(IScheduledTask task) {
-    return task.getAssignedTask().getTask().getOwner().getRole();
-  }
-
-  public static String getJob(IScheduledTask task) {
-    return task.getAssignedTask().getTask().getJobName();
-  }
-
   /**
    * Get the latest active task or the latest inactive task if no active task exists.
    *
@@ -234,10 +226,6 @@ public final class Tasks {
         .max(tasks);
   }
 
-  public static ITaskEvent getSecondToLatestEvent(IScheduledTask task) {
-    return task.getTaskEvents().get(task.getTaskEvents().size() - 2);
-  }
-
   public static ITaskEvent getLatestEvent(IScheduledTask task) {
     return Iterables.getLast(task.getTaskEvents());
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index f9521f9..f167290 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -17,6 +17,7 @@ import java.util.Date;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.twitter.common.util.Clock;
 
@@ -38,7 +39,8 @@ import static java.util.Objects.requireNonNull;
  * Implements lock-related primitives required to provide mutual exclusion guarantees
  * to the critical Scheduler state-mutating operations.
  */
-class LockManagerImpl implements LockManager {
+@VisibleForTesting
+public class LockManagerImpl implements LockManager {
   private final Storage storage;
   private final Clock clock;
   private final UUIDGenerator tokenGenerator;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 6a97f36..5ac42b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -168,7 +168,7 @@ public class ForwardingStore implements
   }
 
   @Override
-  public boolean isActive(String updateId) {
-    return jobUpdateStore.isActive(updateId);
+  public Optional<String> getLockToken(String updateId) {
+    return jobUpdateStore.getLockToken(updateId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index c520e13..dfaadc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -74,12 +74,12 @@ public interface JobUpdateStore {
   Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails();
 
   /**
-   * Determines whether an update ID represents a currently-active job update.
+   * Gets the lock token associated with a job update.
    *
    * @param updateId Job update ID.
-   * @return {@code true} if this update has exclusive access to the job, otherwise {@code false}.
+   * @return the token associated with the update id, if it exists.
    */
-  boolean isActive(String updateId);
+  Optional<String> getLockToken(String updateId);
 
   interface Mutable extends JobUpdateStore {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index 87b773b..0737f92 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -157,10 +157,10 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
   }
 
   @Override
-  public boolean isActive(String updateId) {
+  public Optional<String> getLockToken(String updateId) {
     // We assume here that cascading deletes will cause a lock-update associative row to disappear
     // when the lock is invalidated.  This further assumes that a lock row is deleted when a lock
     // is no longer valid.
-    return detailsMapper.selectLockToken(updateId) != null;
+    return Optional.fromNullable(detailsMapper.selectLockToken(updateId));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index dc0169a..d4b8141 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -151,6 +151,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
 import org.apache.aurora.scheduler.thrift.auth.Requires;
 import org.apache.aurora.scheduler.updater.JobUpdateController;
+import org.apache.aurora.scheduler.updater.UpdateConfigurationException;
 import org.apache.aurora.scheduler.updater.UpdateStateException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
@@ -1362,10 +1363,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response startJobUpdate(
-      JobUpdateRequest mutableRequest,
-      SessionKey session) {
-
+  public Response startJobUpdate(JobUpdateRequest mutableRequest, SessionKey session) {
     // TODO(maxim): validate JobUpdateRequest fields.
     requireNonNull(mutableRequest);
     requireNonNull(session);
@@ -1398,15 +1396,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
-        final ILock lock;
-        try {
-          lock = lockManager.acquireLock(
-              ILockKey.build(LockKey.job(request.getJobKey().newBuilder())),
-              context.getIdentity());
-        } catch (LockException e) {
-          return addMessage(response, LOCK_ERROR, e);
-        }
-
         // TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
 
         String updateId = uuidGenerator.createNew().toString();
@@ -1423,9 +1412,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
                 .setOldTaskConfigs(buildOldTaskConfigs(request.getJobKey(), storeProvider))));
 
         try {
-          jobUpdateController.start(update, lock.getToken());
+          jobUpdateController.start(update, context.getIdentity());
           return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
-        } catch (UpdateStateException e) {
+        } catch (UpdateStateException | UpdateConfigurationException e) {
           return addMessage(response, INVALID_REQUEST, e);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
new file mode 100644
index 0000000..3774c85
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import com.google.common.base.Optional;
+
+import static org.apache.aurora.scheduler.updater.InstanceActionHandler.AddTask;
+import static org.apache.aurora.scheduler.updater.InstanceActionHandler.KillTask;
+import static org.apache.aurora.scheduler.updater.InstanceActionHandler.WatchRunningTask;
+
+enum InstanceAction {
+  KILL_TASK(Optional.<InstanceActionHandler>of(new KillTask())),
+  ADD_TASK(Optional.<InstanceActionHandler>of(new AddTask())),
+  WATCH_TASK(Optional.<InstanceActionHandler>of(new WatchRunningTask())),
+  AWAIT_STATE_CHANGE(Optional.<InstanceActionHandler>absent());
+
+  private final Optional<InstanceActionHandler> handler;
+
+  private InstanceAction(Optional<InstanceActionHandler> handler) {
+    this.handler = handler;
+  }
+
+  public Optional<InstanceActionHandler> getHandler() {
+    return handler;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
new file mode 100644
index 0000000..2538291
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.logging.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IRange;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+
+interface InstanceActionHandler {
+
+  Amount<Long, Time> getReevaluationDelay(
+      IInstanceKey instance,
+      IJobUpdateConfiguration updateConfig,
+      TaskStore taskStore,
+      StateManager stateManager,
+      JobUpdateStatus status);
+
+  Logger LOG = Logger.getLogger(InstanceActionHandler.class.getName());
+
+  class AddTask implements InstanceActionHandler {
+    private static ITaskConfig getTargetConfig(
+        IJobUpdateConfiguration configuration,
+        boolean rollingForward,
+        int instanceId) {
+
+      if (rollingForward) {
+        return configuration.getNewTaskConfig();
+      } else {
+        for (IInstanceTaskConfig config : configuration.getOldTaskConfigs()) {
+          for (IRange range : config.getInstances()) {
+            if (Range.closed(range.getFirst(), range.getLast()).contains(instanceId)) {
+              return config.getTask();
+            }
+          }
+        }
+
+        throw new IllegalStateException("Failed to find instance " + instanceId);
+      }
+    }
+
+    @Override
+    public Amount<Long, Time> getReevaluationDelay(
+        IInstanceKey instance,
+        IJobUpdateConfiguration updateConfig,
+        TaskStore taskStore,
+        StateManager stateManager,
+        JobUpdateStatus status) {
+
+      // TODO(wfarner): This skips quota validation.  Either check quota here, or augment
+      // quota checking to take updates into consideration (AURORA-686).
+      LOG.info("Adding instance " + instance + " while " + status);
+      ITaskConfig replacement = getTargetConfig(
+          updateConfig,
+          status == ROLLING_FORWARD,
+          instance.getInstanceId());
+      stateManager.insertPendingTasks(replacement, ImmutableSet.of(instance.getInstanceId()));
+      return  Amount.of(
+          (long) updateConfig.getSettings().getMaxWaitToInstanceRunningMs(),
+          Time.MILLISECONDS);
+    }
+  }
+
+  class KillTask implements InstanceActionHandler {
+    @Override
+    public Amount<Long, Time> getReevaluationDelay(
+        IInstanceKey instance,
+        IJobUpdateConfiguration updateConfig,
+        TaskStore taskStore,
+        StateManager stateManager,
+        JobUpdateStatus status) {
+
+      String taskId = Tasks.id(Iterables.getOnlyElement(
+          taskStore.fetchTasks(Query.instanceScoped(instance).active())));
+      LOG.info("Killing " + instance + " while " + status);
+      stateManager.changeState(
+          taskId,
+          Optional.<ScheduleStatus>absent(),
+          ScheduleStatus.KILLING,
+          Optional.of("Killed for job update."));
+      return Amount.of(
+          (long) updateConfig.getSettings().getMaxWaitToInstanceRunningMs(),
+          Time.MILLISECONDS);
+    }
+  }
+
+  class WatchRunningTask implements InstanceActionHandler {
+    @Override
+    public Amount<Long, Time> getReevaluationDelay(
+        IInstanceKey instance,
+        IJobUpdateConfiguration updateConfig,
+        TaskStore taskStore,
+        StateManager stateManager,
+        JobUpdateStatus status) {
+
+      return Amount.of(
+          (long) updateConfig.getSettings().getMinWaitInInstanceRunningMs(),
+          Time.MILLISECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
index d725bc3..dc11abd 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceUpdater.java
@@ -32,6 +32,7 @@ import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_AFTER_MIN_RUNNING_MS;
 import static org.apache.aurora.scheduler.updater.StateEvaluator.Result.EVALUATE_ON_STATE_CHANGE;
@@ -94,20 +95,26 @@ class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
     return millisSince(earliestNonRunningEvent) >= maxNonRunningTime.as(Time.MILLISECONDS);
   }
 
-  private boolean permanentlyKilled(IScheduledTask task) {
-    return Iterables.any(
-        task.getTaskEvents(),
-        Predicates.compose(Predicates.equalTo(ScheduleStatus.KILLING), Tasks.TASK_EVENT_TO_STATUS));
+  private static boolean isPermanentlyKilled(IScheduledTask task) {
+    boolean wasKilling =
+        Iterables.any(
+            task.getTaskEvents(),
+            Predicates.compose(Predicates.equalTo(KILLING), Tasks.TASK_EVENT_TO_STATUS));
+    return task.getStatus() != KILLING && wasKilling;
   }
 
   private static boolean isKillable(ScheduleStatus status) {
-    return Tasks.isActive(status) && status != ScheduleStatus.KILLING;
+    return Tasks.isActive(status) && status != KILLING;
+  }
+
+  private static boolean isTaskPresent(Optional<IScheduledTask> task) {
+    return task.isPresent() && !isPermanentlyKilled(task.get());
   }
 
   @Override
   public synchronized StateEvaluator.Result evaluate(Optional<IScheduledTask> actualState) {
     boolean desiredPresent = desiredState.isPresent();
-    boolean actualPresent = actualState.isPresent();
+    boolean actualPresent = isTaskPresent(actualState);
 
     if (desiredPresent && actualPresent) {
       // The update is changing the task configuration.
@@ -167,7 +174,7 @@ class InstanceUpdater implements StateEvaluator<Optional<IScheduledTask>> {
       if (isKillable(status)) {
         // Task is active, kill it.
         return StateEvaluator.Result.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE;
-      } else if (Tasks.isTerminated(status) && permanentlyKilled(actualState)) {
+      } else if (Tasks.isTerminated(status) && isPermanentlyKilled(actualState)) {
         // The old task has exited, it is now safe to add the new one.
         return StateEvaluator.Result.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE;
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index 39bdca0..8f9e6d6 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.updater;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 /**
  * A controller that exposes commands to initiate and modify active job updates.
@@ -26,12 +27,13 @@ public interface JobUpdateController {
    * Initiates an update.
    *
    * @param update Instructions for what job to update, and how to update it.
-   * @param lockToken UUID identifying the lock associated with this update.
+   * @param updatingUser User initiating the update.
    * @throws UpdateStateException If the update cannot be started, for example if the instructions
    *                              are invalid, or if there is already an in-progress update for the
    *                              job.
    */
-  void start(IJobUpdate update, String lockToken) throws UpdateStateException;
+  void start(IJobUpdate update, String updatingUser)
+      throws UpdateStateException, UpdateConfigurationException;
 
   /**
    * Pauses an in-progress update.
@@ -70,18 +72,21 @@ public interface JobUpdateController {
    * Notifies the updater that the state of an instance has changed. A state change could also mean
    * deletion.
    *
-   * @param instance Identifier fo the instance whose state has changed.
+   * @param updatedTask The latest state for the task that changed.
    */
-  void instanceChangedState(IInstanceKey instance);
+  void instanceChangedState(IScheduledTask updatedTask);
 
   /**
-   * Restores an active update for a job that has been halted due to the scheduler restarting.
-   * This is distinct from {@link #resume(IJobKey)} in that it does not change the state of the
-   * update, but resumes after a restart of the scheduler process.
+   * Notifies the updater that an instance was deleted.
    *
-   * @param job Job to resume.
-   * @throws UpdateStateException If the update cannot resume, such as if the update is already
-   *                              active.
+   * @param instance Identifier of the deleted instance.
    */
-  void systemResume(IJobKey job) throws UpdateStateException;
+  void instanceDeleted(IInstanceKey instance);
+
+  /**
+   * Restores active updates that have been halted due to the scheduler restarting.
+   * This is distinct from {@link #resume(IJobKey)} in that it does not change the state of
+   * updates, but resumes after a restart of the scheduler process.
+   */
+  void systemResume();
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 3f542ce..159b09e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -13,41 +13,489 @@
  */
 package org.apache.aurora.scheduler.updater;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.inject.Inject;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateQuery;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.Lock;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.state.LockManager;
+import org.apache.aurora.scheduler.state.LockManager.LockException;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
+import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.ROLL_BACK;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.ROLL_FORWARD;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.STOP_WATCHING;
+import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.assertTransitionAllowed;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus;
+import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus.SUCCEEDED;
 
 /**
- * TODO(wfarner): Implement this as part of AURORA-610.
+ * Implementation of an updater that orchestrates the process of gradually updating the
+ * configuration of tasks in a job.
+ * >p>
+ * TODO(wfarner): Consider using AbstractIdleService here.
  */
 class JobUpdateControllerImpl implements JobUpdateController {
-  @Override
-  public void start(IJobUpdate update, String lockToken) throws UpdateStateException {
-    throw new UnsupportedOperationException("Not yet implemented.");
+  private static final Logger LOG = Logger.getLogger(JobUpdateControllerImpl.class.getName());
+
+  private final UpdateFactory updateFactory;
+  private final LockManager lockManager;
+  private final Storage storage;
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+  private final Clock clock;
+
+  // Currently-active updaters. An active updater is one that is rolling forward or back. Paused
+  // and completed updates are represented only in storage, not here.
+  private final Map<IJobKey, UpdateFactory.Update> updates =
+      Collections.synchronizedMap(Maps.<IJobKey, UpdateFactory.Update>newHashMap());
+
+  @Inject
+  JobUpdateControllerImpl(
+      UpdateFactory updateFactory,
+      LockManager lockManager,
+      Storage storage,
+      ScheduledExecutorService executor,
+      StateManager stateManager,
+      Clock clock) {
+
+    this.updateFactory = requireNonNull(updateFactory);
+    this.lockManager = requireNonNull(lockManager);
+    this.storage = requireNonNull(storage);
+    this.executor = requireNonNull(executor);
+    this.stateManager = requireNonNull(stateManager);
+    this.clock = requireNonNull(clock);
   }
 
   @Override
-  public void pause(IJobKey job) throws UpdateStateException {
-    throw new UnsupportedOperationException("Not yet implemented.");
+  public void start(final IJobUpdate update, final String updatingUser)
+      throws UpdateStateException, UpdateConfigurationException {
+
+    requireNonNull(update);
+    requireNonNull(updatingUser);
+
+    // Validate the update configuration by making sure we can create an updater for it.
+    updateFactory.newUpdate(update.getConfiguration(), true);
+
+    storage.write(new MutateWork.NoResult<UpdateStateException>() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider)
+          throws UpdateStateException {
+
+        IJobUpdateSummary summary = update.getSummary();
+        IJobKey job = summary.getJobKey();
+        ILock lock;
+        try {
+          lock =
+              lockManager.acquireLock(ILockKey.build(LockKey.job(job.newBuilder())), updatingUser);
+        } catch (LockException e) {
+          throw new UpdateStateException(e.getMessage(), e);
+        }
+
+        storeProvider.getJobUpdateStore().saveJobUpdate(update, lock.getToken());
+        recordAndChangeJobUpdateStatus(
+            storeProvider.getJobUpdateStore(),
+            storeProvider.getTaskStore(),
+            summary.getUpdateId(),
+            job,
+            ROLLING_FORWARD);
+      }
+    });
   }
 
   @Override
+  public void pause(final IJobKey job) throws UpdateStateException {
+    requireNonNull(job);
+    unscopedChangeUpdateStatus(job, JobUpdateStateMachine.GET_PAUSE_STATE);
+  }
+
   public void resume(IJobKey job) throws UpdateStateException {
-    throw new UnsupportedOperationException("Not yet implemented.");
+    requireNonNull(job);
+    unscopedChangeUpdateStatus(job, JobUpdateStateMachine.GET_RESUME_STATE);
   }
 
   @Override
   public void abort(IJobKey job) throws UpdateStateException {
-    throw new UnsupportedOperationException("Not yet implemented.");
+    requireNonNull(job);
+    unscopedChangeUpdateStatus(job, new Function<JobUpdateStatus, JobUpdateStatus>() {
+      @Override
+      public JobUpdateStatus apply(JobUpdateStatus input) {
+        return ABORTED;
+      }
+    });
+  }
+
+  @Override
+  public void systemResume() {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        for (IJobUpdateSummary summary
+            : storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(ACTIVE_QUERY)) {
+
+          LOG.info("Automatically resuming update " + JobKeys.canonicalString(summary.getJobKey()));
+          changeJobUpdateStatus(
+              storeProvider.getJobUpdateStore(),
+              storeProvider.getTaskStore(),
+              summary.getUpdateId(),
+              summary.getJobKey(),
+              summary.getState().getStatus(),
+              false);
+        }
+      }
+    });
   }
 
   @Override
-  public void instanceChangedState(IInstanceKey instance) {
-    throw new UnsupportedOperationException("Not yet implemented.");
+  public void instanceChangedState(final IScheduledTask updatedTask) {
+    instanceChanged(
+        InstanceKeys.from(
+            JobKeys.from(updatedTask.getAssignedTask().getTask()),
+            updatedTask.getAssignedTask().getInstanceId()),
+        Optional.of(updatedTask));
   }
 
   @Override
-  public void systemResume(IJobKey job) throws UpdateStateException {
-    throw new UnsupportedOperationException("Not yet implemented.");
+  public void instanceDeleted(IInstanceKey instance) {
+    // This is primarily used to detect when an instance was stuck in PENDING and killed, which
+    // results in deletion.
+    instanceChanged(instance, Optional.<IScheduledTask>absent());
+  }
+
+  private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        IJobKey job = instance.getJobKey();
+        UpdateFactory.Update update = updates.get(job);
+        if (update != null) {
+          if (update.getUpdater().containsInstance(instance.getInstanceId())) {
+            LOG.info("Forwarding task change for " + InstanceKeys.toString(instance));
+            evaluateUpdater(
+                storeProvider.getJobUpdateStore(),
+                storeProvider.getTaskStore(),
+                update,
+                getOnlyMatch(storeProvider.getJobUpdateStore(), queryByJob(job)),
+                ImmutableMap.of(instance.getInstanceId(), state));
+          } else {
+            LOG.info("Instance " + instance + " is not part of active update for "
+                + JobKeys.canonicalString(job));
+          }
+        }
+      }
+    });
+  }
+
+  private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery query) {
+    return Iterables.getOnlyElement(store.fetchJobUpdateSummaries(query));
+  }
+
+  @VisibleForTesting
+  static IJobUpdateQuery queryByJob(IJobKey job) {
+    return IJobUpdateQuery.build(new JobUpdateQuery()
+        .setJobKey(job.newBuilder())
+        .setUpdateStatuses(ImmutableSet.of(
+            ROLLING_FORWARD,
+            ROLLING_BACK,
+            ROLL_FORWARD_PAUSED,
+            ROLL_BACK_PAUSED)));
+  }
+
+  @VisibleForTesting
+  static IJobUpdateQuery queryByUpdateId(String updateId) {
+    return IJobUpdateQuery.build(new JobUpdateQuery()
+        .setUpdateId(updateId));
+  }
+
+  /**
+   * Changes the state of an update, without the 'scope' of an update ID.  This should only be used
+   * when responding to outside inputs that are inherently un-scoped, such as a user action or task
+   * state change.
+   *
+   * @param job Job whose update state should be changed.
+   * @param stateChange State change computation, based on the current state of the update.
+   * @throws UpdateStateException If no active update exists for the provided {@code job}, or
+   *                              if the proposed state transition is not allowed.
+   */
+  private void unscopedChangeUpdateStatus(
+      final IJobKey job,
+      final Function<JobUpdateStatus, JobUpdateStatus> stateChange)
+      throws UpdateStateException {
+
+    storage.write(new MutateWork.NoResult<UpdateStateException>() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider)
+          throws UpdateStateException {
+
+        IJobUpdateSummary update = Iterables.getOnlyElement(
+            storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(queryByJob(job)), null);
+        if (update == null) {
+          throw new UpdateStateException("There is no active update for " + job);
+        }
+
+        JobUpdateStatus status = update.getState().getStatus();
+        JobUpdateStatus newStatus = requireNonNull(stateChange.apply(status));
+        changeUpdateStatus(
+            storeProvider.getJobUpdateStore(),
+            storeProvider.getTaskStore(),
+            update,
+            newStatus);
+      }
+    });
+  }
+
+  private void changeUpdateStatus(
+      JobUpdateStore.Mutable updateStore,
+      TaskStore taskStore,
+      IJobUpdateSummary updateSummary,
+      JobUpdateStatus newStatus) {
+
+    if (updateSummary.getState().getStatus() == newStatus) {
+      return;
+    }
+
+    assertTransitionAllowed(updateSummary.getState().getStatus(), newStatus);
+    recordAndChangeJobUpdateStatus(
+        updateStore,
+        taskStore,
+        updateSummary.getUpdateId(),
+        updateSummary.getJobKey(),
+        newStatus);
+  }
+
+  private void recordAndChangeJobUpdateStatus(
+      JobUpdateStore.Mutable updateStore,
+      TaskStore taskStore,
+      String updateId,
+      IJobKey job,
+      JobUpdateStatus status) {
+
+    changeJobUpdateStatus(updateStore, taskStore, updateId, job, status, true);
+  }
+
+  private static final Set<JobUpdateStatus> UNLOCK_STATES = ImmutableSet.of(
+      ROLLED_FORWARD,
+      ROLLED_BACK,
+      ABORTED,
+      JobUpdateStatus.FAILED,
+      ERROR
+  );
+
+  private void changeJobUpdateStatus(
+      JobUpdateStore.Mutable updateStore,
+      TaskStore taskStore,
+      String updateId,
+      IJobKey job,
+      JobUpdateStatus newStatus,
+      boolean recordChange) {
+
+    JobUpdateStatus status;
+    boolean record;
+
+    Optional<String> updateLock = updateStore.getLockToken(updateId);
+    if (updateLock.isPresent()) {
+      status = newStatus;
+      record = recordChange;
+    } else {
+      LOG.severe("Update " + updateId + " does not have a lock");
+      status = ERROR;
+      record = true;
+    }
+
+    LOG.info(String.format(
+        "Update %s %s is now in state %s", JobKeys.canonicalString(job), updateId, status));
+    if (record) {
+      updateStore.saveJobUpdateEvent(
+          IJobUpdateEvent.build(new JobUpdateEvent()
+              .setStatus(status)
+              .setTimestampMs(clock.nowMillis())),
+          updateId);
+    }
+
+    if (UNLOCK_STATES.contains(status) && updateLock.isPresent()) {
+      lockManager.releaseLock(ILock.build(new Lock()
+          .setKey(LockKey.job(job.newBuilder()))
+          .setToken(updateLock.get())));
+    }
+
+    MonitorAction action = JobUpdateStateMachine.getActionForStatus(status);
+    if (action == STOP_WATCHING) {
+      updates.remove(job);
+    } else if (action == ROLL_FORWARD || action == ROLL_BACK) {
+      if (action == ROLL_BACK) {
+        updates.remove(job);
+      } else {
+        checkState(!updates.containsKey(job), "Updater already exists for " + job);
+      }
+
+      IJobUpdate jobUpdate = updateStore.fetchJobUpdate(updateId).get();
+      UpdateFactory.Update update;
+      try {
+        update = updateFactory.newUpdate(jobUpdate.getConfiguration(), action == ROLL_FORWARD);
+      } catch (UpdateConfigurationException | RuntimeException e) {
+        changeJobUpdateStatus(updateStore, taskStore, updateId, job, ERROR, true);
+        return;
+      }
+      updates.put(job, update);
+      evaluateUpdater(
+          updateStore,
+          taskStore,
+          update,
+          jobUpdate.getSummary(),
+          ImmutableMap.<Integer, Optional<IScheduledTask>>of());
+    }
+  }
+
+  private static Optional<IScheduledTask> getActiveInstance(
+      TaskStore taskStore,
+      IJobKey job,
+      int instanceId) {
+
+    return Optional.fromNullable(Iterables.getOnlyElement(
+        taskStore.fetchTasks(Query.instanceScoped(job, instanceId).active()), null));
+  }
+
+  private void evaluateUpdater(
+      JobUpdateStore.Mutable updateStore,
+      final TaskStore taskStore,
+      final UpdateFactory.Update update,
+      IJobUpdateSummary summary,
+      Map<Integer, Optional<IScheduledTask>> changedInstance) {
+
+    JobUpdateStatus updaterStatus = summary.getState().getStatus();
+    final IJobKey job = summary.getJobKey();
+
+    if (!updateStore.getLockToken(summary.getUpdateId()).isPresent()) {
+      recordAndChangeJobUpdateStatus(
+          updateStore,
+          taskStore,
+          summary.getUpdateId(),
+          job,
+          ERROR);
+      return;
+    }
+
+    InstanceStateProvider<Integer, Optional<IScheduledTask>> stateProvider =
+        new InstanceStateProvider<Integer, Optional<IScheduledTask>>() {
+          @Override
+          public Optional<IScheduledTask> getState(Integer instanceId) {
+            return getActiveInstance(taskStore, job, instanceId);
+          }
+        };
+
+    EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider);
+    LOG.info(JobKeys.canonicalString(job) + " evaluation result: " + result);
+    OneWayStatus status = result.getStatus();
+    if (status == SUCCEEDED || status == OneWayStatus.FAILED) {
+      Preconditions.checkArgument(
+          result.getInstanceActions().isEmpty(),
+          "A terminal state should not specify actions.");
+
+      if (status == SUCCEEDED) {
+        changeUpdateStatus(updateStore, taskStore, summary, update.getSuccessStatus());
+      } else {
+        changeUpdateStatus(updateStore, taskStore, summary, update.getFailureStatus());
+      }
+    } else if (result.getInstanceActions().isEmpty()) {
+      LOG.info("No actions to perform at this time for update of " + job);
+    } else {
+      for (Map.Entry<Integer, InstanceAction> entry : result.getInstanceActions().entrySet()) {
+        IInstanceKey instance = InstanceKeys.from(job, entry.getKey());
+        Optional<InstanceActionHandler> handler = entry.getValue().getHandler();
+        if (handler.isPresent()) {
+          Amount<Long, Time> reevaluateDelay = handler.get().getReevaluationDelay(
+              instance,
+              updateStore.fetchJobUpdateConfiguration(summary.getUpdateId()).get(),
+              taskStore,
+              stateManager,
+              updaterStatus);
+          executor.schedule(
+              getDeferredEvaluator(instance, summary.getUpdateId()),
+              reevaluateDelay.getValue(),
+              reevaluateDelay.getUnit().getTimeUnit());
+        }
+      }
+    }
+  }
+
+  private Runnable getDeferredEvaluator(final IInstanceKey instance, final String updateId) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(Storage.MutableStoreProvider storeProvider) {
+            IJobUpdateSummary summary =
+                getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdateId(updateId));
+            JobUpdateStatus status = summary.getState().getStatus();
+            // Suppress this evaluation if the updater is not currently active.
+            if (JobUpdateStateMachine.isActive(status)) {
+              UpdateFactory.Update update = updates.get(instance.getJobKey());
+              evaluateUpdater(
+                  storeProvider.getJobUpdateStore(),
+                  storeProvider.getTaskStore(),
+                  update,
+                  summary,
+                  ImmutableMap.of(
+                      instance.getInstanceId(),
+                      getActiveInstance(
+                          storeProvider.getTaskStore(),
+                          instance.getJobKey(),
+                          instance.getInstanceId())));
+            }
+          }
+        });
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
index 7be792f..49d8b7a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
@@ -13,29 +13,21 @@
  */
 package org.apache.aurora.scheduler.updater;
 
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
+import com.twitter.common.stats.Stats;
 
-import org.apache.aurora.gen.InstanceKey;
-import org.apache.aurora.gen.JobUpdateQuery;
+import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 
@@ -43,60 +35,55 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
  * A pubsub event subscriber that forwards status updates to the job update controller.
  */
 class JobUpdateEventSubscriber implements PubsubEvent.EventSubscriber {
+  private static final Logger LOG = Logger.getLogger(JobUpdateEventSubscriber.class.getName());
+
+  private static final AtomicLong RECOVERY_ERRORS = Stats.exportLong("job_update_recovery_errors");
+  private static final AtomicLong DELETE_ERRORS = Stats.exportLong("job_update_delete_errors");
+  private static final AtomicLong STATE_CHANGE_ERRORS =
+      Stats.exportLong("job_update_state_change_errors");
+
   private final JobUpdateController controller;
-  private final Storage storage;
 
   @Inject
-  JobUpdateEventSubscriber(JobUpdateController controller, Storage storage) {
+  JobUpdateEventSubscriber(JobUpdateController controller) {
     this.controller = requireNonNull(controller);
-    this.storage = requireNonNull(storage);
   }
 
-  private static final Function<IScheduledTask, IInstanceKey> TASK_TO_INSTANCE_KEY =
-      new Function<IScheduledTask, IInstanceKey>() {
-        @Override
-        public IInstanceKey apply(IScheduledTask task) {
-          return IInstanceKey.build(
-              new InstanceKey()
-                  .setJobKey(Tasks.SCHEDULED_TO_JOB_KEY.apply(task).newBuilder())
-                  .setInstanceId(Tasks.SCHEDULED_TO_INSTANCE_ID.apply(task)));
-        }
-      };
-
   @Subscribe
   public synchronized void taskChangedState(TaskStateChange change) {
-    controller.instanceChangedState(TASK_TO_INSTANCE_KEY.apply(change.getTask()));
+    try {
+      controller.instanceChangedState(change.getTask());
+    } catch (RuntimeException e) {
+      LOG.log(Level.SEVERE, "Failed to handle state change: " + e, e);
+      STATE_CHANGE_ERRORS.incrementAndGet();
+    }
   }
 
   @Subscribe
   public synchronized void tasksDeleted(TasksDeleted event) {
-    Set<IInstanceKey> instances = FluentIterable.from(event.getTasks())
-        .transform(TASK_TO_INSTANCE_KEY)
-        .toSet();
-    for (IInstanceKey instance : instances) {
-      controller.instanceChangedState(instance);
+    for (IScheduledTask task : event.getTasks()) {
+      // Ignore pruned tasks, since they are irrelevant to updates.
+      try {
+        if (!Tasks.isTerminated(task.getStatus())) {
+          controller.instanceDeleted(
+              InstanceKeys.from(
+                  Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
+                  task.getAssignedTask().getInstanceId()));
+        }
+      } catch (RuntimeException e) {
+        LOG.log(Level.SEVERE, "Failed to handle instance deletion: " + e, e);
+        DELETE_ERRORS.incrementAndGet();
+      }
     }
   }
 
-  @VisibleForTesting
-  static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build(
-      new JobUpdateQuery().setUpdateStatuses(ImmutableSet.of(ROLLING_FORWARD, ROLLING_BACK)));
-
   @Subscribe
-  public synchronized void schedulerActive(PubsubEvent.SchedulerActive event)
-      throws UpdateStateException {
-
-    storage.write(new Storage.MutateWork.NoResult<UpdateStateException>() {
-      @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider)
-          throws UpdateStateException {
-
-        for (IJobUpdateSummary summary
-            : storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(ACTIVE_QUERY)) {
-
-          controller.systemResume(summary.getJobKey());
-        }
-      }
-    });
+  public synchronized void schedulerActive(PubsubEvent.SchedulerActive event) {
+    try {
+      controller.systemResume();
+    } catch (RuntimeException e) {
+      LOG.log(Level.SEVERE, "Failed to resume job updates: " + e, e);
+      RECOVERY_ERRORS.incrementAndGet();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
index 037602d..5dec82a 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateStateMachine.java
@@ -15,15 +15,22 @@ package org.apache.aurora.scheduler.updater;
 
 import java.util.Map;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 
+import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
 import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
@@ -51,7 +58,7 @@ final class JobUpdateStateMachine {
               ROLLED_FORWARD,
               ABORTED,
               ERROR)
-          .putAll(ROLLING_BACK, ROLL_BACK_PAUSED, ROLLED_BACK, ABORTED, ERROR)
+          .putAll(ROLLING_BACK, ROLL_BACK_PAUSED, ROLLED_BACK, ABORTED, ERROR, FAILED)
           .putAll(ROLL_FORWARD_PAUSED, ROLLING_FORWARD, ABORTED, ERROR)
           .putAll(ROLL_BACK_PAUSED, ROLLING_BACK, ABORTED, ERROR)
           .build();
@@ -62,20 +69,64 @@ final class JobUpdateStateMachine {
           .put(ROLLING_BACK, ROLL_BACK)
           .build();
 
+  private static final BiMap<JobUpdateStatus, JobUpdateStatus> ACTIVE_TO_PAUSED_STATES =
+      ImmutableBiMap.of(
+          ROLLING_FORWARD, ROLL_FORWARD_PAUSED,
+          ROLLING_BACK, ROLL_BACK_PAUSED);
+
+  static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build(
+      new JobUpdateQuery()
+          .setUpdateStatuses(ImmutableSet.copyOf(ACTIVE_TO_PAUSED_STATES.keySet())));
+
+  private static final Map<JobUpdateStatus, JobUpdateStatus> PAUSE_BEHAVIOR =
+      ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder()
+          .putAll(ACTIVE_TO_PAUSED_STATES)
+          .put(ROLL_FORWARD_PAUSED, ROLL_FORWARD_PAUSED)
+          .put(ROLL_BACK_PAUSED, ROLL_BACK_PAUSED)
+          .build();
+
+  static final Function<JobUpdateStatus, JobUpdateStatus> GET_PAUSE_STATE =
+      new Function<JobUpdateStatus, JobUpdateStatus>() {
+        @Override
+        public JobUpdateStatus apply(JobUpdateStatus status) {
+          return PAUSE_BEHAVIOR.get(status);
+        }
+      };
+
+  private static final Map<JobUpdateStatus, JobUpdateStatus> RESUME_BEHAVIOR =
+      ImmutableMap.<JobUpdateStatus, JobUpdateStatus>builder()
+          .putAll(ACTIVE_TO_PAUSED_STATES.inverse())
+          .put(ROLLING_FORWARD, ROLLING_FORWARD)
+          .put(ROLLING_BACK, ROLLING_BACK)
+          .build();
+
+  static final Function<JobUpdateStatus, JobUpdateStatus> GET_RESUME_STATE =
+      new Function<JobUpdateStatus, JobUpdateStatus>() {
+        @Override
+        public JobUpdateStatus apply(JobUpdateStatus status) {
+          return RESUME_BEHAVIOR.get(status);
+        }
+      };
+
   /**
    * Determines the action to take in response to a status change on a job update.
    *
    * @param from Starting state.
    * @param to Desired target state.
-   * @return The action to perform when moving from {@code from} to {@code to}.
    * @throws IllegalStateException if the requested transition is not allowed.
    */
-  static MonitorAction transition(JobUpdateStatus from, JobUpdateStatus to) {
+  static void assertTransitionAllowed(JobUpdateStatus from, JobUpdateStatus to) {
     if (!ALLOWED_TRANSITIONS.containsEntry(from, to)) {
       throw new IllegalStateException("Cannot transition update from " + from + " to " + to);
     }
+  }
+
+  static MonitorAction getActionForStatus(JobUpdateStatus status) {
+    return Optional.fromNullable(ACTIONS.get(status)).or(MonitorAction.STOP_WATCHING);
+  }
 
-    return Optional.fromNullable(ACTIONS.get(to)).or(MonitorAction.STOP_WATCHING);
+  static boolean isActive(JobUpdateStatus status) {
+    return ACTIVE_TO_PAUSED_STATES.keySet().contains(status);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
index d6a1e4b..5a9af39 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdater.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler.updater;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.logging.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -25,6 +26,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Maps.EntryTransformer;
 import com.twitter.common.util.StateMachine;
 
 import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
@@ -44,6 +46,8 @@ import static org.apache.aurora.scheduler.updater.StateEvaluator.Result;
  * @param <T> Instance data type.
  */
 class OneWayJobUpdater<K, T> {
+  private static final Logger LOG = Logger.getLogger(OneWayJobUpdater.class.getName());
+
   private final UpdateStrategy<K> strategy;
   private final int maxFailedInstances;
   private final Map<K, InstanceUpdate<T>> instances;
@@ -52,8 +56,8 @@ class OneWayJobUpdater<K, T> {
           .initialState(OneWayStatus.IDLE)
           .addState(OneWayStatus.IDLE, OneWayStatus.WORKING)
           .addState(OneWayStatus.WORKING, OneWayStatus.SUCCEEDED, OneWayStatus.FAILED)
-          .addState(OneWayStatus.SUCCEEDED)
-          .addState(OneWayStatus.FAILED)
+          .addState(OneWayStatus.SUCCEEDED, OneWayStatus.SUCCEEDED)
+          .addState(OneWayStatus.FAILED, OneWayStatus.FAILED)
           .throwOnBadTransition(true)
           .build();
 
@@ -74,12 +78,12 @@ class OneWayJobUpdater<K, T> {
     this.maxFailedInstances = maxFailedInstances;
     checkArgument(!instanceEvaluators.isEmpty());
 
-    this.instances = ImmutableMap.copyOf(Maps.transformValues(
+    this.instances = ImmutableMap.copyOf(Maps.transformEntries(
         instanceEvaluators,
-        new Function<StateEvaluator<T>, InstanceUpdate<T>>() {
+        new EntryTransformer<K, StateEvaluator<T>, InstanceUpdate<T>>() {
           @Override
-          public InstanceUpdate<T> apply(StateEvaluator<T> evaluator) {
-            return new InstanceUpdate<>(evaluator);
+          public InstanceUpdate<T> transformEntry(K key, StateEvaluator<T> value) {
+            return new InstanceUpdate<>("Instance " + key, value);
           }
         }));
   }
@@ -92,29 +96,28 @@ class OneWayJobUpdater<K, T> {
         }
       };
 
-  private static <K, T> Map<K, InstanceUpdate<T>> filterByStatus(
+  private static <K, T> Set<K> filterByStatus(
       Map<K, InstanceUpdate<T>> instances,
       InstanceUpdateStatus status) {
 
-    return ImmutableMap.copyOf(
-        Maps.filterValues(instances, Predicates.compose(Predicates.equalTo(status), GET_STATE)));
+    return ImmutableSet.copyOf(
+        Maps.filterValues(instances, Predicates.compose(Predicates.equalTo(status), GET_STATE))
+            .keySet());
   }
 
   private static Optional<InstanceAction> resultToAction(Result result) {
     switch (result) {
       case EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.EVALUATE_ON_STATE_CHANGE);
+        return Optional.of(InstanceAction.AWAIT_STATE_CHANGE);
       case REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+        return Optional.of(InstanceAction.ADD_TASK);
       case KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE:
-        return Optional.of(InstanceAction.KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE);
+        return Optional.of(InstanceAction.KILL_TASK);
       case EVALUATE_AFTER_MIN_RUNNING_MS:
-        return Optional.of(InstanceAction.EVALUATE_AFTER_MIN_RUNNING_MS);
+        return Optional.of(InstanceAction.WATCH_TASK);
       default:
-        break;
+        return Optional.absent();
     }
-
-    return Optional.absent();
   }
 
   @VisibleForTesting
@@ -123,9 +126,20 @@ class OneWayJobUpdater<K, T> {
   }
 
   /**
+   * Checks whether an instance is in scope for this update.
+   *
+   * @param instanceId Instance id to check.
+   * @return {@code true} if the instance is part of the update, as originally specified by
+   *         {@code instanceEvaluators}.
+   */
+  boolean containsInstance(K instanceId) {
+    return instances.containsKey(instanceId);
+  }
+
+  /**
    * Performs an evaluation of the job.  An evaluation would normally be triggered to initiate the
    * update, as a result of a state change relevant to the update, or due to a
-   * {@link InstanceAction#EVALUATE_AFTER_MIN_RUNNING_MS requested} instance re-evaluation.
+   * {@link InstanceAction#WATCH_TASK requested} instance re-evaluation.
    *
    * @param instancesNeedingUpdate Instances triggering the event, if any.
    * @param stateProvider Provider to fetch state of instances, and pass to
@@ -136,7 +150,7 @@ class OneWayJobUpdater<K, T> {
    *         {@link OneWayStatus#WORKING working} state, as indicated by a previous evaluation.
    */
   synchronized EvaluationResult<K> evaluate(
-      Set<K> instancesNeedingUpdate,
+      Map<K, T> instancesNeedingUpdate,
       InstanceStateProvider<K, T> stateProvider) {
 
     if (stateMachine.getState() == OneWayStatus.IDLE) {
@@ -148,30 +162,31 @@ class OneWayJobUpdater<K, T> {
 
     // Call order is important here: update on-demand instances, evaluate new instances, compute
     // job update state.
-    Map<K, InstanceAction> actions = ImmutableMap.<K, InstanceAction>builder()
+    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.<K, InstanceAction>builder()
         // Re-evaluate instances that are in need of update.
-        .putAll(evaluateInstances(instancesNeedingUpdate, stateProvider))
-        // If ready to begin updating more instances, evaluate those as well.
-        .putAll(startNextInstanceGroup(stateProvider))
-        .build();
+        .putAll(evaluateInstances(instancesNeedingUpdate));
 
-    return new EvaluationResult<K>(computeJobUpdateStatus(), actions);
-  }
+    if (computeJobUpdateStatus() == OneWayStatus.WORKING) {
+      // If ready to begin updating more instances, evaluate those as well.
+      actions.putAll(startNextInstanceGroup(stateProvider));
+    }
 
-  private Map<K, InstanceAction> evaluateInstances(
-      Set<K> instanceIds,
-      InstanceStateProvider<K, T> stateProvider) {
+    return new EvaluationResult<K>(computeJobUpdateStatus(), actions.build());
+  }
 
+  private Map<K, InstanceAction> evaluateInstances(Map<K, T> updatedInstances) {
     ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
-    for (K instanceId : instanceIds) {
+    for (Map.Entry<K, T> entry : updatedInstances.entrySet()) {
+      K instanceId = entry.getKey();
       InstanceUpdate<T> update = instances.get(instanceId);
       // Suppress state changes for updates that are not in-progress.
       if (update.getState() == InstanceUpdateStatus.WORKING) {
-        Optional<InstanceAction> action =
-            resultToAction(update.evaluate(stateProvider.getState(instanceId)));
+        Optional<InstanceAction> action = resultToAction(update.evaluate(entry.getValue()));
         if (action.isPresent()) {
           actions.put(instanceId, action.get());
         }
+      } else {
+        LOG.info("Ignoring state change for instance outside working set: " + instanceId);
       }
     }
 
@@ -179,29 +194,40 @@ class OneWayJobUpdater<K, T> {
   }
 
   private Map<K, InstanceAction> startNextInstanceGroup(InstanceStateProvider<K, T> stateProvider) {
-    ImmutableMap.Builder<K, InstanceAction> actions = ImmutableMap.builder();
-
-    Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
-    if (!idle.isEmpty()) {
-      Map<K, InstanceUpdate<T>> working =
-          filterByStatus(instances, InstanceUpdateStatus.WORKING);
-      for (K instance : strategy.getNextGroup(idle.keySet(), working.keySet())) {
-        Result result = instances.get(instance).evaluate(stateProvider.getState(instance));
-        Optional<InstanceAction> action = resultToAction(result);
-        if (action.isPresent()) {
-          actions.put(instance, action.get());
+    Set<K> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+    if (idle.isEmpty()) {
+      return ImmutableMap.of();
+    } else {
+      ImmutableMap.Builder<K, InstanceAction> builder = ImmutableMap.builder();
+      Set<K> working = filterByStatus(instances, InstanceUpdateStatus.WORKING);
+      Set<K> nextGroup = strategy.getNextGroup(idle, working);
+      if (!nextGroup.isEmpty()) {
+        for (K instance : nextGroup) {
+          Result result = instances.get(instance).evaluate(stateProvider.getState(instance));
+          Optional<InstanceAction> action = resultToAction(result);
+          if (action.isPresent()) {
+            builder.put(instance, action.get());
+          }
         }
+        LOG.info("Updated working set for update to "
+            + filterByStatus(instances, InstanceUpdateStatus.WORKING));
       }
-    }
 
-    return actions.build();
+      Map<K, InstanceAction> actions = builder.build();
+      if (!idle.isEmpty() && working.isEmpty() && actions.isEmpty()) {
+        // There's no in-flight instances, and no actions - so there's nothing left to initiate more
+        // work on this job. Try to find more work, or converge.
+        return startNextInstanceGroup(stateProvider);
+      } else {
+        return actions;
+      }
+    }
   }
 
   private OneWayStatus computeJobUpdateStatus() {
-    Map<K, InstanceUpdate<T>> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
-    Map<K, InstanceUpdate<T>> working =
-        filterByStatus(instances, InstanceUpdateStatus.WORKING);
-    Map<K, InstanceUpdate<T>> failed = filterByStatus(instances, InstanceUpdateStatus.FAILED);
+    Set<K> idle = filterByStatus(instances, InstanceUpdateStatus.IDLE);
+    Set<K> working = filterByStatus(instances, InstanceUpdateStatus.WORKING);
+    Set<K> failed = filterByStatus(instances, InstanceUpdateStatus.FAILED);
     // TODO(wfarner): This needs to be updated to support rollback.
     if (failed.size() > maxFailedInstances) {
       stateMachine.transition(OneWayStatus.FAILED);
@@ -217,21 +243,22 @@ class OneWayJobUpdater<K, T> {
    */
   private static class InstanceUpdate<T> {
     private final StateEvaluator<T> evaluator;
-    private final StateMachine<InstanceUpdateStatus> stateMachine =
-        StateMachine.<InstanceUpdateStatus>builder("instance_update")
-            .initialState(InstanceUpdateStatus.IDLE)
-            .addState(InstanceUpdateStatus.IDLE, InstanceUpdateStatus.WORKING)
-            .addState(
-                InstanceUpdateStatus.WORKING,
-                InstanceUpdateStatus.SUCCEEDED,
-                InstanceUpdateStatus.FAILED)
-            .addState(InstanceUpdateStatus.SUCCEEDED)
-            .addState(InstanceUpdateStatus.FAILED)
-            .throwOnBadTransition(true)
-            .build();
-
-    InstanceUpdate(StateEvaluator<T> evaluator) {
+    private final StateMachine<InstanceUpdateStatus> stateMachine;
+
+    InstanceUpdate(String name, StateEvaluator<T> evaluator) {
       this.evaluator = requireNonNull(evaluator);
+      stateMachine = StateMachine.<InstanceUpdateStatus>builder(name)
+          .initialState(InstanceUpdateStatus.IDLE)
+          .addState(InstanceUpdateStatus.IDLE, InstanceUpdateStatus.WORKING)
+          .addState(
+              InstanceUpdateStatus.WORKING,
+              InstanceUpdateStatus.SUCCEEDED,
+              InstanceUpdateStatus.FAILED)
+          .addState(InstanceUpdateStatus.SUCCEEDED)
+          .addState(InstanceUpdateStatus.FAILED)
+          .throwOnBadTransition(true)
+          .logTransitions()
+          .build();
     }
 
     InstanceUpdateStatus getState() {
@@ -253,7 +280,8 @@ class OneWayJobUpdater<K, T> {
     }
   }
 
-  private enum InstanceUpdateStatus {
+  @VisibleForTesting
+  enum InstanceUpdateStatus {
     IDLE,
     WORKING,
     SUCCEEDED,
@@ -271,29 +299,19 @@ class OneWayJobUpdater<K, T> {
   }
 
   /**
-   * Action that should be performed by the caller to converge towards the desired update state.
-   */
-  enum InstanceAction {
-    EVALUATE_ON_STATE_CHANGE,
-    REPLACE_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    KILL_TASK_AND_EVALUATE_ON_STATE_CHANGE,
-    EVALUATE_AFTER_MIN_RUNNING_MS
-  }
-
-  /**
    * Result of an evaluation round.
    */
   static class EvaluationResult<K> {
-    private final OneWayStatus jobStatus;
+    private final OneWayStatus status;
     private final Map<K, InstanceAction> instanceActions;
 
     EvaluationResult(OneWayStatus jobStatus, Map<K, InstanceAction> instanceActions) {
-      this.jobStatus = requireNonNull(jobStatus);
+      this.status = requireNonNull(jobStatus);
       this.instanceActions = requireNonNull(instanceActions);
     }
 
-    public OneWayStatus getJobStatus() {
-      return jobStatus;
+    public OneWayStatus getStatus() {
+      return status;
     }
 
     public Map<K, InstanceAction> getInstanceActions() {
@@ -307,19 +325,19 @@ class OneWayJobUpdater<K, T> {
       }
       @SuppressWarnings("unchecked")
       EvaluationResult<K> other = (EvaluationResult<K>) obj;
-      return other.getJobStatus().equals(this.getJobStatus())
+      return other.getStatus().equals(this.getStatus())
           && other.getInstanceActions().equals(this.getInstanceActions());
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(getJobStatus(), getInstanceActions());
+      return Objects.hash(getStatus(), getInstanceActions());
     }
 
     @Override
     public String toString() {
       return com.google.common.base.Objects.toStringHelper(this)
-          .add("jobStatus", getJobStatus())
+          .add("status", getStatus())
           .add("instanceActions", getInstanceActions())
           .toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactory.java
deleted file mode 100644
index 80baa7f..0000000
--- a/src/main/java/org/apache/aurora/scheduler/updater/OneWayJobUpdaterFactory.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.updater;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import com.google.inject.Inject;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
-import org.apache.aurora.scheduler.storage.entities.IRange;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.updater.strategy.QueueStrategy;
-import org.apache.aurora.scheduler.updater.strategy.UpdateStrategy;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A factory that produces one-way job updaters based on a job update configuration.
- */
-interface OneWayJobUpdaterFactory {
-
-  /**
-   * Creates a one-way job updater that will execute the job update configuration in the direction
-   * specified by {@code rollingForward}.
-   *
-   * @param configuration Configuration to act on.
-   * @param rollingForward {@code true} if this is a job update, {@code false} if it is a rollback.
-   * @return A one-way updater that will execute the job update as specified in the
-   *         {@code configuration}.
-   * @throws UpdateConfigurationException If the provided configuration cannot be used.
-   */
-  OneWayJobUpdater<Integer, Optional<IScheduledTask>> newUpdate(
-      IJobUpdateConfiguration configuration,
-      boolean rollingForward) throws UpdateConfigurationException;
-
-  /**
-   * Thrown when an invalid job update configuration is encountered.
-   */
-  class UpdateConfigurationException extends Exception {
-    UpdateConfigurationException(String msg) {
-      super(msg);
-    }
-  }
-
-  class OneWayJobUpdaterFactoryImpl implements OneWayJobUpdaterFactory {
-    private final Clock clock;
-
-    @Inject
-    OneWayJobUpdaterFactoryImpl(Clock clock) {
-      this.clock = requireNonNull(clock);
-    }
-
-    @Override
-    public OneWayJobUpdater<Integer, Optional<IScheduledTask>> newUpdate(
-        IJobUpdateConfiguration configuration,
-        boolean rollingForward) throws UpdateConfigurationException {
-
-      requireNonNull(configuration);
-
-      Set<Integer> instances;
-      IJobUpdateSettings settings = configuration.getSettings();
-      Range<Integer> updateConfigurationInstances =
-          Range.closedOpen(0, configuration.getInstanceCount());
-      if (settings.getUpdateOnlyTheseInstances().isEmpty()) {
-        Set<Integer> newInstanceIds =
-            ImmutableRangeSet.of(updateConfigurationInstances).asSet(DiscreteDomain.integers());
-
-        // In a full job update, the working set is the union of instance IDs before and after.
-        instances =  ImmutableSet.copyOf(
-            Sets.union(expandInstanceIds(configuration.getOldTaskConfigs()), newInstanceIds));
-      } else {
-        instances = rangesToInstanceIds(settings.getUpdateOnlyTheseInstances());
-
-        if (!updateConfigurationInstances.containsAll(instances)) {
-          throw new UpdateConfigurationException(
-              "When updating specific instances, "
-                  + "all specified instances must be in the update configuration.");
-        }
-      }
-
-      ImmutableMap.Builder<Integer, StateEvaluator<Optional<IScheduledTask>>> evaluators =
-          ImmutableMap.builder();
-      for (int instanceId : instances) {
-        Optional<ITaskConfig> desiredState;
-        if (rollingForward) {
-          desiredState = updateConfigurationInstances.contains(instanceId)
-              ? Optional.of(configuration.getNewTaskConfig())
-              : Optional.<ITaskConfig>absent();
-        } else {
-          desiredState = getConfig(instanceId, configuration.getOldTaskConfigs());
-        }
-
-        evaluators.put(
-            instanceId,
-            new InstanceUpdater(
-                desiredState,
-                settings.getMaxPerInstanceFailures(),
-                Amount.of((long) settings.getMinWaitInInstanceRunningMs(), Time.MILLISECONDS),
-                Amount.of((long) settings.getMaxWaitToInstanceRunningMs(), Time.MILLISECONDS),
-                clock));
-      }
-
-      // TODO(wfarner): Add the batch_completion flag to JobUpdateSettings and pick correct
-      // strategy.
-      UpdateStrategy<Integer> strategy = new QueueStrategy<>(settings.getUpdateGroupSize());
-
-      return new OneWayJobUpdater<>(
-          strategy,
-          settings.getMaxFailedInstances(),
-          evaluators.build());
-    }
-
-    private static Range<Integer> toRange(IRange range) {
-      return Range.closed(range.getFirst(), range.getLast());
-    }
-
-    private static Set<Integer> rangesToInstanceIds(Set<IRange> ranges) {
-      ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
-      for (IRange range : ranges) {
-        instanceIds.add(toRange(range));
-      }
-
-      return instanceIds.build().asSet(DiscreteDomain.integers());
-    }
-
-    private static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) {
-      ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
-      for (IInstanceTaskConfig group : instanceGroups) {
-        for (IRange range : group.getInstances()) {
-          instanceIds.add(toRange(range));
-        }
-      }
-
-      return instanceIds.build().asSet(DiscreteDomain.integers());
-    }
-
-    private static Optional<ITaskConfig> getConfig(
-        int id,
-        Set<IInstanceTaskConfig> instanceGroups) {
-
-      for (IInstanceTaskConfig group : instanceGroups) {
-        for (IRange range : group.getInstances()) {
-          if (toRange(range).contains(id)) {
-            return Optional.of(group.getTask());
-          }
-        }
-      }
-
-      return Optional.absent();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/406d0fb7/src/main/java/org/apache/aurora/scheduler/updater/UpdateConfigurationException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateConfigurationException.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateConfigurationException.java
new file mode 100644
index 0000000..6e11ad1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateConfigurationException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+/**
+ * Thrown when an invalid job update configuration is encountered.
+ */
+public class UpdateConfigurationException extends Exception {
+  public UpdateConfigurationException(String msg) {
+    super(msg);
+  }
+}


Mime
View raw message