aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: Implementing update pause/resume/abort APIs.
Date Tue, 19 Aug 2014 18:38:07 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master a86e70907 -> b667ead4b


Implementing update pause/resume/abort APIs.

Bugs closed: AURORA-649

Reviewed at https://reviews.apache.org/r/24835/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b667ead4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b667ead4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b667ead4

Branch: refs/heads/master
Commit: b667ead4b3f4e0404e5ab8f3733ac2d9f940fa1d
Parents: a86e709
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Aug 19 11:37:32 2014 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Aug 19 11:37:32 2014 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/app/AppModule.java  |   2 +
 .../aurora/scheduler/state/JobUpdater.java      |  48 ----
 .../aurora/scheduler/state/JobUpdaterImpl.java  | 149 ------------
 .../aurora/scheduler/state/StateModule.java     |   3 -
 .../aurora/scheduler/state/UUIDGenerator.java   |   2 +-
 .../thrift/SchedulerThriftInterface.java        | 142 ++++++++++--
 .../scheduler/updater/JobUpdateController.java  |   5 +-
 .../updater/JobUpdateControllerImpl.java        |   4 +-
 .../aurora/scheduler/updater/UpdaterModule.java |  30 +++
 .../scheduler/state/JobUpdaterImplTest.java     | 188 ---------------
 .../thrift/SchedulerThriftInterfaceTest.java    | 226 ++++++++++++++++---
 .../aurora/scheduler/thrift/ThriftIT.java       |   6 +-
 12 files changed, 366 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 61ba4b2..9ec3f41 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
+import org.apache.aurora.scheduler.updater.UpdaterModule;
 import org.apache.mesos.Scheduler;
 import org.apache.zookeeper.data.ACL;
 
@@ -117,6 +118,7 @@ class AppModule extends AbstractModule {
     install(new SchedulerModule());
     install(new StateModule());
     install(new SlaModule());
+    install(new UpdaterModule());
 
     bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
deleted file mode 100644
index eb472e4..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdater.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
-
-/**
- * Coordinates job updates and exposes control primitives (start, pause, resume, abort).
- */
-public interface JobUpdater {
-
-  /**
-   * Saves the job update request and starts the update process.
-   *
-   * @param request Job update request.
-   * @param user User who initiated the update.
-   * @param lockToken Token for the lock held for this update.
-   * @return Saved job update ID.
-   * @throws UpdaterException Throws if update fails to start for any reason.
-   */
-  String startJobUpdate(IJobUpdateRequest request, String user, String lockToken)
-      throws UpdaterException;
-
-  /**
-   * Thrown when job update related operation failed.
-   */
-  class UpdaterException extends Exception {
-
-    public UpdaterException(String message) {
-      super(message);
-    }
-
-    public UpdaterException(String message, Throwable e) {
-      super(message, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java b/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
deleted file mode 100644
index 2c64600..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/JobUpdaterImpl.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Range;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateConfiguration;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.scheduler.base.Numbers;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Implementation of the {@link JobUpdater}.
- */
-class JobUpdaterImpl implements JobUpdater {
-
-  private final Storage storage;
-  private final UUIDGenerator uuidGenerator;
-  private final Clock clock;
-
-  @Inject
-  JobUpdaterImpl(Storage storage, Clock clock, UUIDGenerator uuidGenerator) {
-    this.storage = requireNonNull(storage);
-    this.clock = requireNonNull(clock);
-    this.uuidGenerator = requireNonNull(uuidGenerator);
-  }
-
-  @Override
-  public String startJobUpdate(
-      final IJobUpdateRequest request,
-      final String user,
-      final String lockToken) throws UpdaterException {
-
-    return storage.write(new MutateWork<String, UpdaterException>() {
-      @Override
-      public String apply(Storage.MutableStoreProvider storeProvider) throws UpdaterException {
-        String updateId = uuidGenerator.createNew().toString();
-
-        IJobUpdate update = IJobUpdate.build(new JobUpdate()
-            .setSummary(new JobUpdateSummary()
-                .setJobKey(request.getJobKey().newBuilder())
-                .setUpdateId(updateId)
-                .setUser(user))
-            .setConfiguration(new JobUpdateConfiguration()
-                .setSettings(request.getSettings().newBuilder())
-                .setInstanceCount(request.getInstanceCount())
-                .setNewTaskConfig(request.getTaskConfig().newBuilder())
-                .setOldTaskConfigs(buildOldTaskConfigs(request.getJobKey(), storeProvider))));
-
-        IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
-            .setStatus(JobUpdateStatus.ROLLING_FORWARD)
-            .setTimestampMs(clock.nowMillis()));
-
-        try {
-          storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
-          storeProvider.getJobUpdateStore().saveJobUpdateEvent(event, updateId);
-        } catch (StorageException e) {
-          throw new UpdaterException("Failed to start update.", e);
-        }
-
-        // TODO(maxim): wire in updater logic when ready.
-
-        return updateId;
-      }
-    });
-  }
-
-  private static final Function<Collection<Integer>, Set<Range<Integer>>> TO_RANGES  =
-      new Function<Collection<Integer>, Set<Range<Integer>>>() {
-        @Override
-        public Set<Range<Integer>> apply(Collection<Integer> numbers) {
-          return Numbers.toRanges(numbers);
-        }
-      };
-
-  private Set<InstanceTaskConfig> buildOldTaskConfigs(
-      IJobKey jobKey,
-      StoreProvider storeProvider) {
-
-    Set<IScheduledTask> tasks =
-        storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
-
-    // Group tasks by their configurations.
-    Multimap<ITaskConfig, IScheduledTask> tasksByConfig =
-        Multimaps.index(tasks, Tasks.SCHEDULED_TO_INFO);
-
-    // Translate tasks into instance IDs.
-    Multimap<ITaskConfig, Integer> instancesByConfig =
-        Multimaps.transformValues(tasksByConfig, Tasks.SCHEDULED_TO_INSTANCE_ID);
-
-    // Reduce instance IDs into contiguous ranges.
-    Map<ITaskConfig, Set<Range<Integer>>> rangesByConfig =
-        Maps.transformValues(instancesByConfig.asMap(), TO_RANGES);
-
-    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
-    for (Map.Entry<ITaskConfig, Set<Range<Integer>>> entry : rangesByConfig.entrySet()) {
-      ImmutableSet.Builder<org.apache.aurora.gen.Range> ranges = ImmutableSet.builder();
-      for (Range<Integer> range : entry.getValue()) {
-        ranges.add(new org.apache.aurora.gen.Range(range.lowerEndpoint(), range.upperEndpoint()));
-      }
-
-      builder.add(new InstanceTaskConfig()
-          .setTask(entry.getKey().newBuilder())
-          .setInstances(ranges.build()));
-    }
-
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index cc1eee4..2c712ef 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -47,9 +47,6 @@ public class StateModule extends AbstractModule {
     bind(LockManager.class).to(LockManagerImpl.class);
     bind(LockManagerImpl.class).in(Singleton.class);
 
-    bind(JobUpdater.class).to(JobUpdaterImpl.class);
-    bind(JobUpdaterImpl.class).in(Singleton.class);
-
     bindMaintenanceController(binder());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java b/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
index 3938c30..994a77f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
@@ -18,7 +18,7 @@ import java.util.UUID;
 /**
  * Wraps {@link java.util.UUID#randomUUID()} to facilitate unit testing.
  */
-interface UUIDGenerator {
+public interface UUIDGenerator {
   UUID createNew();
 
   class UUIDGeneratorImpl implements UUIDGenerator {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 0d8d75b..487a1c8 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -42,6 +42,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
 import org.apache.aurora.auth.CapabilityValidator;
@@ -64,13 +65,17 @@ import org.apache.aurora.gen.GetQuotaResult;
 import org.apache.aurora.gen.Hosts;
 import org.apache.aurora.gen.InstanceConfigRewrite;
 import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobConfigRewrite;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobSummary;
 import org.apache.aurora.gen.JobSummaryResult;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateConfiguration;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateRequest;
+import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.ListBackupsResult;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
@@ -96,6 +101,7 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Jobs;
+import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.ScheduleException;
 import org.apache.aurora.scheduler.base.Tasks;
@@ -112,13 +118,12 @@ import org.apache.aurora.scheduler.metadata.NearestFit;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
-import org.apache.aurora.scheduler.state.JobUpdater;
-import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.storage.JobStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -130,6 +135,7 @@ import org.apache.aurora.scheduler.storage.backup.StorageBackup;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
@@ -138,6 +144,8 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
 import org.apache.aurora.scheduler.thrift.auth.Requires;
+import org.apache.aurora.scheduler.updater.JobUpdateController;
+import org.apache.aurora.scheduler.updater.UpdateStateException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 
@@ -188,7 +196,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private final QuotaManager quotaManager;
   private final NearestFit nearestFit;
   private final StateManager stateManager;
-  private final JobUpdater jobUpdater;
+  private final UUIDGenerator uuidGenerator;
+  private final JobUpdateController jobUpdateController;
 
   @Inject
   SchedulerThriftInterface(
@@ -204,7 +213,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       QuotaManager quotaManager,
       NearestFit nearestFit,
       StateManager stateManager,
-      JobUpdater jobUpdater) {
+      UUIDGenerator uuidGenerator,
+      JobUpdateController jobUpdateController) {
 
     this(storage,
         schedulerCore,
@@ -218,7 +228,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         quotaManager,
         nearestFit,
         stateManager,
-        jobUpdater);
+        uuidGenerator,
+        jobUpdateController);
   }
 
   @VisibleForTesting
@@ -235,7 +246,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       QuotaManager quotaManager,
       NearestFit nearestFit,
       StateManager stateManager,
-      JobUpdater jobUpdater) {
+      UUIDGenerator uuidGenerator,
+      JobUpdateController jobUpdateController) {
 
     this.storage = requireNonNull(storage);
     this.schedulerCore = requireNonNull(schedulerCore);
@@ -249,7 +261,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     this.quotaManager = requireNonNull(quotaManager);
     this.nearestFit = requireNonNull(nearestFit);
     this.stateManager = requireNonNull(stateManager);
-    this.jobUpdater = requireNonNull(jobUpdater);
+    this.uuidGenerator = requireNonNull(uuidGenerator);
+    this.jobUpdateController = requireNonNull(jobUpdateController);
   }
 
   @Override
@@ -1255,6 +1268,48 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         new GetLocksResult().setLocks(ILock.toBuildersSet(lockManager.getLocks()))));
   }
 
+  private static final Function<Collection<Integer>, Set<Range<Integer>>> TO_RANGES  =
+      new Function<Collection<Integer>, Set<Range<Integer>>>() {
+        @Override
+        public Set<Range<Integer>> apply(Collection<Integer> numbers) {
+          return Numbers.toRanges(numbers);
+        }
+      };
+
+  private static Set<InstanceTaskConfig> buildOldTaskConfigs(
+      IJobKey jobKey,
+      Storage.StoreProvider storeProvider) {
+
+    Set<IScheduledTask> tasks =
+        storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active());
+
+    // Group tasks by their configurations.
+    Multimap<ITaskConfig, IScheduledTask> tasksByConfig =
+        Multimaps.index(tasks, Tasks.SCHEDULED_TO_INFO);
+
+    // Translate tasks into instance IDs.
+    Multimap<ITaskConfig, Integer> instancesByConfig =
+        Multimaps.transformValues(tasksByConfig, Tasks.SCHEDULED_TO_INSTANCE_ID);
+
+    // Reduce instance IDs into contiguous ranges.
+    Map<ITaskConfig, Set<Range<Integer>>> rangesByConfig =
+        Maps.transformValues(instancesByConfig.asMap(), TO_RANGES);
+
+    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
+    for (Map.Entry<ITaskConfig, Set<Range<Integer>>> entry : rangesByConfig.entrySet()) {
+      ImmutableSet.Builder<org.apache.aurora.gen.Range> ranges = ImmutableSet.builder();
+      for (Range<Integer> range : entry.getValue()) {
+        ranges.add(new org.apache.aurora.gen.Range(range.lowerEndpoint(), range.upperEndpoint()));
+      }
+
+      builder.add(new InstanceTaskConfig()
+          .setTask(entry.getKey().newBuilder())
+          .setInstances(ranges.build()));
+    }
+
+    return builder.build();
+  }
+
   @Override
   public Response startJobUpdate(
       JobUpdateRequest mutableRequest,
@@ -1286,7 +1341,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return storage.write(new MutateWork.Quiet<Response>() {
       @Override
       public Response apply(MutableStoreProvider storeProvider) {
-        // TODO(wfarner): Move lock acquisition down into the update controller once introduced.
         final ILock lock;
         try {
           lock = lockManager.acquireLock(
@@ -1298,11 +1352,23 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
         // TODO(maxim): Wire in task limits and quota checks from SchedulerCore.
 
+        String updateId = uuidGenerator.createNew().toString();
+
+        IJobUpdate update = IJobUpdate.build(new JobUpdate()
+            .setSummary(new JobUpdateSummary()
+                .setJobKey(request.getJobKey().newBuilder())
+                .setUpdateId(updateId)
+                .setUser(context.getIdentity()))
+            .setConfiguration(new JobUpdateConfiguration()
+                .setSettings(request.getSettings().newBuilder())
+                .setInstanceCount(request.getInstanceCount())
+                .setNewTaskConfig(request.getTaskConfig().newBuilder())
+                .setOldTaskConfigs(buildOldTaskConfigs(request.getJobKey(), storeProvider))));
+
         try {
-          String updateId =
-              jobUpdater.startJobUpdate(request, context.getIdentity(), lock.getToken());
+          jobUpdateController.start(update, lock.getToken());
           return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));
-        } catch (UpdaterException e) {
+        } catch (UpdateStateException e) {
           return addMessage(response, INVALID_REQUEST, e);
         }
       }
@@ -1310,18 +1376,60 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response pauseJobUpdate(JobKey jobKey, SessionKey session) {
-    throw new UnsupportedOperationException("Not implemented");
+  public Response pauseJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
+          sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+          jobUpdateController.pause(jobKey);
+          return okEmptyResponse();
+        } catch (AuthFailedException e) {
+          return addMessage(emptyResponse(), AUTH_FAILED, e);
+        } catch (UpdateStateException e) {
+          return addMessage(emptyResponse(), INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   @Override
-  public Response resumeJobUpdate(JobKey jobKey, SessionKey session) {
-    throw new UnsupportedOperationException("Not implemented");
+  public Response resumeJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
+          sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+          jobUpdateController.resume(jobKey);
+          return okEmptyResponse();
+        } catch (AuthFailedException e) {
+          return addMessage(emptyResponse(), AUTH_FAILED, e);
+        } catch (UpdateStateException e) {
+          return addMessage(emptyResponse(), INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   @Override
-  public Response abortJobUpdate(JobKey jobKey, SessionKey session) {
-    throw new UnsupportedOperationException("Not implemented");
+  public Response abortJobUpdate(final JobKey mutableJobKey, final SessionKey session) {
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          IJobKey jobKey = JobKeys.assertValid(IJobKey.build(requireNonNull(mutableJobKey)));
+          sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
+          jobUpdateController.abort(jobKey);
+          return okEmptyResponse();
+        } catch (AuthFailedException e) {
+          return addMessage(emptyResponse(), AUTH_FAILED, e);
+        } catch (UpdateStateException e) {
+          return addMessage(emptyResponse(), INVALID_REQUEST, e);
+        }
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index 5eb5117..39bdca0 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -20,17 +20,18 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 /**
  * A controller that exposes commands to initiate and modify active job updates.
  */
-interface JobUpdateController {
+public interface JobUpdateController {
 
   /**
    * Initiates an update.
    *
    * @param update Instructions for what job to update, and how to update it.
+   * @param lockToken UUID identifying the lock associated with this update.
    * @throws UpdateStateException If the update cannot be started, for example if the instructions
    *                              are invalid, or if there is already an in-progress update for the
    *                              job.
    */
-  void start(IJobUpdate update) throws UpdateStateException;
+  void start(IJobUpdate update, String lockToken) throws UpdateStateException;
 
   /**
    * Pauses an in-progress update.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 28fd32f..3f542ce 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -20,9 +20,9 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 /**
  * TODO(wfarner): Implement this as part of AURORA-610.
  */
-public class JobUpdateControllerImpl implements JobUpdateController {
+class JobUpdateControllerImpl implements JobUpdateController {
   @Override
-  public void start(IJobUpdate update) throws UpdateStateException {
+  public void start(IJobUpdate update, String lockToken) throws UpdateStateException {
     throw new UnsupportedOperationException("Not yet implemented.");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
new file mode 100644
index 0000000..028cb07
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import javax.inject.Singleton;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Binding module for scheduling logic and higher-level state management.
+ */
+public class UpdaterModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(JobUpdateController.class).to(JobUpdateControllerImpl.class);
+    bind(JobUpdateControllerImpl.class).in(Singleton.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
deleted file mode 100644
index 90b4e8a..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/JobUpdaterImplTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateConfiguration;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateRequest;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-
-public class JobUpdaterImplTest extends EasyMockTest {
-
-  private static final String UPDATE_ID = "82d6d790-3212-11e3-aa6e-0800200c9a74";
-  private static final UUID ID = UUID.fromString(UPDATE_ID);
-  private static final IJobKey JOB = JobKeys.from("role", "env", "name");
-  private static final Identity IDENTITY = new Identity("role", "user");
-  private static final Long TIMESTAMP = 1234L;
-  private static final String LOCK = "lock token";
-
-  private JobUpdater updater;
-  private StorageTestUtil storageUtil;
-  private Clock clock;
-  private UUIDGenerator idGenerator;
-
-  @Before
-  public void setUp() throws Exception {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
-
-    clock = createMock(Clock.class);
-    idGenerator = createMock(UUIDGenerator.class);
-
-    updater = new JobUpdaterImpl(storageUtil.storage, clock, idGenerator);
-  }
-
-  @Test
-  public void testSaveUpdate() throws Exception {
-    IScheduledTask oldTask1 = buildScheduledTask(0, 5);
-    IScheduledTask oldTask2 = buildScheduledTask(1, 5);
-    IScheduledTask oldTask3 = buildScheduledTask(2, 7);
-    IScheduledTask oldTask4 = buildScheduledTask(3, 7);
-    IScheduledTask oldTask5 = buildScheduledTask(4, 5);
-    IScheduledTask oldTask6 = buildScheduledTask(5, 5);
-    IScheduledTask oldTask7 = buildScheduledTask(6, 5);
-
-    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
-
-    IJobUpdate update = buildJobUpdate(6, newTask, ImmutableMap.of(
-        oldTask1.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 1), new Range(4, 6)),
-        oldTask3.getAssignedTask().getTask(), ImmutableSet.of(new Range(2, 3))
-    ));
-
-    expect(idGenerator.createNew()).andReturn(ID);
-    expect(clock.nowMillis()).andReturn(TIMESTAMP);
-
-    storageUtil.expectTaskFetch(
-        Query.unscoped().byJob(JOB).active(),
-        oldTask1,
-        oldTask2,
-        oldTask3,
-        oldTask4,
-        oldTask5,
-        oldTask6,
-        oldTask7);
-
-    storageUtil.updateStore.saveJobUpdate(update, LOCK);
-    storageUtil.updateStore.saveJobUpdateEvent(buildUpdateEvent(), UPDATE_ID);
-
-    control.replay();
-
-    assertEquals(
-        UPDATE_ID,
-        updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK));
-  }
-
-  @Test(expected = UpdaterException.class)
-  public void testSaveUpdateFails() throws Exception {
-    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
-
-    IJobUpdate update = buildJobUpdate(
-        6,
-        newTask,
-        ImmutableMap.<ITaskConfig, ImmutableSet<Range>>of());
-
-    expect(idGenerator.createNew()).andReturn(ID);
-    expect(clock.nowMillis()).andReturn(TIMESTAMP);
-
-    storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB).active());
-
-    storageUtil.updateStore.saveJobUpdate(update, LOCK);
-    expectLastCall().andThrow(new Storage.StorageException("fail"));
-
-    control.replay();
-
-    updater.startJobUpdate(buildJobRequest(update), IDENTITY.getUser(), LOCK);
-  }
-
-  private static IJobUpdateRequest buildJobRequest(IJobUpdate update) {
-    return IJobUpdateRequest.build(new JobUpdateRequest()
-        .setInstanceCount(update.getConfiguration().getInstanceCount())
-        .setJobKey(update.getSummary().getJobKey().newBuilder())
-        .setSettings(update.getConfiguration().getSettings().newBuilder())
-        .setTaskConfig(update.getConfiguration().getNewTaskConfig().newBuilder()));
-  }
-
-  private static IScheduledTask buildScheduledTask(int instanceId, long ramMb) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setAssignedTask(new AssignedTask()
-            .setInstanceId(instanceId)
-            .setTask(new TaskConfig()
-                .setRamMb(ramMb) // Simulates unique task config.
-                .setOwner(IDENTITY)
-                .setEnvironment(JOB.getEnvironment())
-                .setJobName(JOB.getName()))));
-  }
-
-  private static IJobUpdate buildJobUpdate(
-      int instanceCount,
-      ITaskConfig newConfig,
-      ImmutableMap<ITaskConfig, ImmutableSet<Range>> oldConfigMap) {
-
-    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
-    for (Map.Entry<ITaskConfig, ImmutableSet<Range>> entry : oldConfigMap.entrySet()) {
-      builder.add(new InstanceTaskConfig(entry.getKey().newBuilder(), entry.getValue()));
-    }
-
-    return IJobUpdate.build(new JobUpdate()
-        .setSummary(new JobUpdateSummary()
-            .setJobKey(JOB.newBuilder())
-            .setUpdateId(UPDATE_ID)
-            .setUser(IDENTITY.getUser()))
-        .setConfiguration(new JobUpdateConfiguration()
-            .setSettings(new JobUpdateSettings())
-            .setInstanceCount(instanceCount)
-            .setNewTaskConfig(newConfig.newBuilder())
-            .setOldTaskConfigs(builder.build())));
-  }
-
-  private static IJobUpdateEvent buildUpdateEvent() {
-    return IJobUpdateEvent.build(new JobUpdateEvent()
-        .setStatus(JobUpdateStatus.ROLLING_FORWARD)
-        .setTimestampMs(TIMESTAMP));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 04abeeb..48c8078 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -18,7 +18,9 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import javax.annotation.Nullable;
 
@@ -36,8 +38,6 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.auth.CapabilityValidator;
 import org.apache.aurora.auth.CapabilityValidator.AuditCheck;
@@ -58,18 +58,23 @@ import org.apache.aurora.gen.Hosts;
 import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.InstanceConfigRewrite;
 import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.InstanceTaskConfig;
 import org.apache.aurora.gen.JobConfigRewrite;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
 import org.apache.aurora.gen.JobStats;
 import org.apache.aurora.gen.JobSummary;
 import org.apache.aurora.gen.JobSummaryResult;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateConfiguration;
 import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.JobUpdateSettings;
+import org.apache.aurora.gen.JobUpdateSummary;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.PendingReason;
+import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.Response;
 import org.apache.aurora.gen.ResponseCode;
@@ -101,20 +106,19 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
 import org.apache.aurora.scheduler.metadata.NearestFit;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.JobUpdater;
-import org.apache.aurora.scheduler.state.JobUpdater.UpdaterException;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -123,6 +127,8 @@ import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.thrift.aop.AopModule;
+import org.apache.aurora.scheduler.updater.JobUpdateController;
+import org.apache.aurora.scheduler.updater.UpdateStateException;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -170,6 +176,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final Lock DEFAULT_LOCK = null;
   private static final String TASK_ID = "task_id";
   private static final String UPDATE_ID = "82d6d790-3212-11e3-aa6e-0800200c9a74";
+  private static final UUID UU_ID = UUID.fromString(UPDATE_ID);
 
   private static final IResourceAggregate QUOTA =
       IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048));
@@ -198,7 +205,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private QuotaManager quotaManager;
   private NearestFit nearestFit;
   private StateManager stateManager;
-  private JobUpdater jobUpdater;
+  private UUIDGenerator uuidGenerator;
+  private JobUpdateController jobUpdateController;
 
   @Before
   public void setUp() throws Exception {
@@ -217,13 +225,13 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     quotaManager = createMock(QuotaManager.class);
     nearestFit = createMock(NearestFit.class);
     stateManager = createMock(StateManager.class);
-    jobUpdater = createMock(JobUpdater.class);
+    uuidGenerator = createMock(UUIDGenerator.class);
+    jobUpdateController = createMock(JobUpdateController.class);
 
     // Use guice and install AuthModule to apply AOP-style auth layer.
     Module testModule = new AbstractModule() {
       @Override
       protected void configure() {
-        bind(Clock.class).toInstance(new FakeClock());
         bind(NonVolatileStorage.class).toInstance(storageUtil.storage);
         bind(SchedulerCore.class).toInstance(scheduler);
         bind(LockManager.class).toInstance(lockManager);
@@ -238,7 +246,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         bind(CronPredictor.class).toInstance(cronPredictor);
         bind(NearestFit.class).toInstance(nearestFit);
         bind(StateManager.class).toInstance(stateManager);
-        bind(JobUpdater.class).toInstance(jobUpdater);
+        bind(UUIDGenerator.class).toInstance(uuidGenerator);
+        bind(JobUpdateController.class).toInstance(jobUpdateController);
       }
     };
     Injector injector = Guice.createInjector(testModule, new AopModule());
@@ -394,6 +403,13 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     assertResponse(AUTH_FAILED, thrift.createJob(makeJob(), DEFAULT_LOCK, SESSION));
   }
 
+  private IScheduledTask buildScheduledTask(int instanceId, long ramMb) {
+    return IScheduledTask.build(new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setInstanceId(instanceId)
+            .setTask(populatedTask().setRamMb(ramMb))));
+  }
+
   private IScheduledTask buildScheduledTask() {
     return buildScheduledTask(JOB_NAME, TASK_ID);
   }
@@ -1810,22 +1826,48 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testStartUpdate() throws Exception {
-    JobUpdateRequest request = createJobRequest(populatedTask());
     expectAuth(ROLE, true);
     expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
-    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
-        .andReturn(UPDATE_ID);
+
+    IScheduledTask oldTask1 = buildScheduledTask(0, 5);
+    IScheduledTask oldTask2 = buildScheduledTask(1, 5);
+    IScheduledTask oldTask3 = buildScheduledTask(2, 7);
+    IScheduledTask oldTask4 = buildScheduledTask(3, 7);
+    IScheduledTask oldTask5 = buildScheduledTask(4, 5);
+    IScheduledTask oldTask6 = buildScheduledTask(5, 5);
+    IScheduledTask oldTask7 = buildScheduledTask(6, 5);
+
+    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
+
+    IJobUpdate update = buildJobUpdate(6, newTask, ImmutableMap.of(
+        oldTask1.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 1), new Range(4, 6)),
+        oldTask3.getAssignedTask().getTask(), ImmutableSet.of(new Range(2, 3))
+    ));
+
+    expect(uuidGenerator.createNew()).andReturn(UU_ID);
+
+    storageUtil.expectTaskFetch(
+        Query.unscoped().byJob(JOB_KEY).active(),
+        oldTask1,
+        oldTask2,
+        oldTask3,
+        oldTask4,
+        oldTask5,
+        oldTask6,
+        oldTask7);
+
+    jobUpdateController.start(update, LOCK.getToken());
 
     control.replay();
 
     Response response =
-        assertOkResponse(thrift.startJobUpdate(request, SESSION));
+        assertOkResponse(thrift.startJobUpdate(buildJobUpdateRequest(update), SESSION));
     assertEquals(UPDATE_ID, response.getResult().getStartJobUpdateResult().getUpdateId());
   }
 
   @Test
   public void testStartUpdateFailsAuth() throws Exception {
-    JobUpdateRequest request = createJobRequest(populatedTask());
+    JobUpdateRequest request = buildJobUpdateRequest(populatedTask());
     expectAuth(ROLE, false);
 
     control.replay();
@@ -1834,7 +1876,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testStartUpdateFailsConfigValidation() throws Exception {
-    JobUpdateRequest request = createJobRequest(populatedTask().setJobName(null));
+    JobUpdateRequest request = buildJobUpdateRequest(populatedTask().setJobName(null));
     expectAuth(ROLE, true);
 
     control.replay();
@@ -1843,7 +1885,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testStartUpdateFailsLockValidation() throws Exception {
-    JobUpdateRequest request = createJobRequest(populatedTask());
+    JobUpdateRequest request = buildJobUpdateRequest(populatedTask());
     expectAuth(ROLE, true);
     expect(lockManager.acquireLock(LOCK_KEY, USER)).andThrow(new LockException("lock failed"));
 
@@ -1853,24 +1895,116 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testStartUpdateFailsInUpdater() throws Exception {
-    JobUpdateRequest request = createJobRequest(populatedTask());
+  public void testStartUpdateFailsInController() throws Exception {
     expectAuth(ROLE, true);
     expect(lockManager.acquireLock(LOCK_KEY, USER)).andReturn(LOCK);
-    expect(jobUpdater.startJobUpdate(IJobUpdateRequest.build(request), USER, LOCK.getToken()))
-        .andThrow(new UpdaterException("failed update"));
+
+    IScheduledTask oldTask = buildScheduledTask(0, 5);
+    ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
+
+    IJobUpdate update = buildJobUpdate(1, newTask, ImmutableMap.of(
+        oldTask.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 0))
+    ));
+
+    expect(uuidGenerator.createNew()).andReturn(UU_ID);
+
+    storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active(), oldTask);
+    jobUpdateController.start(update, LOCK.getToken());
+    expectLastCall().andThrow(new UpdateStateException("failed"));
 
     control.replay();
 
-    assertResponse(INVALID_REQUEST, thrift.startJobUpdate(request, SESSION));
+    assertResponse(INVALID_REQUEST, thrift.startJobUpdate(buildJobUpdateRequest(update), SESSION));
   }
 
-  private static JobUpdateRequest createJobRequest(TaskConfig config) {
-    return new JobUpdateRequest()
-        .setInstanceCount(5)
-        .setJobKey(JOB_KEY.newBuilder())
-        .setSettings(new JobUpdateSettings())
-        .setTaskConfig(config);
+  @Test
+  public void testPauseJobUpdate() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.pause(JOB_KEY);
+
+    control.replay();
+
+    assertResponse(OK, thrift.pauseJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testPauseJobUpdateFailsAuth() throws Exception {
+    expectAuth(ROLE, false);
+
+    control.replay();
+
+    assertResponse(AUTH_FAILED, thrift.pauseJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testPauseJobUpdateFailsInController() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.pause(JOB_KEY);
+    expectLastCall().andThrow(new UpdateStateException("failed"));
+
+    control.replay();
+
+    assertResponse(INVALID_REQUEST, thrift.pauseJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testResumeJobUpdate() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.resume(JOB_KEY);
+
+    control.replay();
+
+    assertResponse(OK, thrift.resumeJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testResumeJobUpdateFailsAuth() throws Exception {
+    expectAuth(ROLE, false);
+
+    control.replay();
+
+    assertResponse(AUTH_FAILED, thrift.resumeJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testResumeJobUpdateFailsInController() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.resume(JOB_KEY);
+    expectLastCall().andThrow(new UpdateStateException("failed"));
+
+    control.replay();
+
+    assertResponse(INVALID_REQUEST, thrift.resumeJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testAbortJobUpdate() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.abort(JOB_KEY);
+
+    control.replay();
+
+    assertResponse(OK, thrift.abortJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testAbortJobUpdateFailsAuth() throws Exception {
+    expectAuth(ROLE, false);
+
+    control.replay();
+
+    assertResponse(AUTH_FAILED, thrift.abortJobUpdate(JOB_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testAbortJobUpdateFailsInController() throws Exception {
+    expectAuth(ROLE, true);
+    jobUpdateController.abort(JOB_KEY);
+    expectLastCall().andThrow(new UpdateStateException("failed"));
+
+    control.replay();
+
+    assertResponse(INVALID_REQUEST, thrift.abortJobUpdate(JOB_KEY.newBuilder(), SESSION));
   }
 
   private static JobConfiguration makeJob() {
@@ -1974,4 +2108,42 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     return new Constraint(DEDICATED_ATTRIBUTE,
         TaskConstraint.value(new ValueConstraint(false, values)));
   }
+
+  private static JobUpdateRequest buildJobUpdateRequest(TaskConfig config) {
+    return new JobUpdateRequest()
+        .setInstanceCount(6)
+        .setJobKey(JOB_KEY.newBuilder())
+        .setSettings(new JobUpdateSettings())
+        .setTaskConfig(config);
+  }
+
+  private static JobUpdateRequest buildJobUpdateRequest(IJobUpdate update) {
+    return new JobUpdateRequest()
+        .setInstanceCount(update.getConfiguration().getInstanceCount())
+        .setJobKey(update.getSummary().getJobKey().newBuilder())
+        .setSettings(update.getConfiguration().getSettings().newBuilder())
+        .setTaskConfig(update.getConfiguration().getNewTaskConfig().newBuilder());
+  }
+
+  private static IJobUpdate buildJobUpdate(
+      int instanceCount,
+      ITaskConfig newConfig,
+      ImmutableMap<ITaskConfig, ImmutableSet<Range>> oldConfigMap) {
+
+    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
+    for (Map.Entry<ITaskConfig, ImmutableSet<Range>> entry : oldConfigMap.entrySet()) {
+      builder.add(new InstanceTaskConfig(entry.getKey().newBuilder(), entry.getValue()));
+    }
+
+    return IJobUpdate.build(new JobUpdate()
+        .setSummary(new JobUpdateSummary()
+            .setJobKey(JOB_KEY.newBuilder())
+            .setUpdateId(UPDATE_ID)
+            .setUser(ROLE_IDENTITY.getUser()))
+        .setConfiguration(new JobUpdateConfiguration()
+            .setSettings(new JobUpdateSettings())
+            .setInstanceCount(instanceCount)
+            .setNewTaskConfig(newConfig.newBuilder())
+            .setOldTaskConfigs(builder.build())));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b667ead4/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 43265fd..055c177 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -35,11 +35,11 @@ import org.apache.aurora.gen.SessionKey;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.CronPredictor;
 import org.apache.aurora.scheduler.quota.QuotaManager;
-import org.apache.aurora.scheduler.state.JobUpdater;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.SchedulerCore;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.state.UUIDGenerator;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -48,6 +48,7 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.thrift.auth.ThriftAuthModule;
+import org.apache.aurora.scheduler.updater.JobUpdateController;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -155,7 +156,8 @@ public class ThriftIT extends EasyMockTest {
             bindMock(LockManager.class);
             bindMock(ShutdownRegistry.class);
             bindMock(StateManager.class);
-            bindMock(JobUpdater.class);
+            bindMock(UUIDGenerator.class);
+            bindMock(JobUpdateController.class);
             storageTestUtil = new StorageTestUtil(ThriftIT.this);
             bind(Storage.class).toInstance(storageTestUtil.storage);
             bind(NonVolatileStorage.class).toInstance(storageTestUtil.storage);


Mime
View raw message