aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] incubator-aurora git commit: Require StateManager callers to open their own transactions.
Date Fri, 14 Nov 2014 02:35:06 GMT
Require StateManager callers to open their own transactions.

Reviewed at https://reviews.apache.org/r/26899/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e9f135dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e9f135dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e9f135dc

Branch: refs/heads/master
Commit: e9f135dc0d9b78d5480c4b37373ce75b29a3d889
Parents: 316f291
Author: Bill Farner <wfarner@apache.org>
Authored: Thu Nov 13 18:32:11 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Nov 13 18:32:11 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/UserTaskLauncher.java      |  26 +-
 .../aurora/scheduler/async/Preemptor.java       |  28 +-
 .../scheduler/async/TaskHistoryPruner.java      |   9 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  17 +-
 .../aurora/scheduler/async/TaskThrottler.java   |  20 +-
 .../aurora/scheduler/async/TaskTimeout.java     |  85 ++--
 .../scheduler/cron/quartz/AuroraCronJob.java    |  36 +-
 .../scheduler/state/MaintenanceController.java  |   1 +
 .../aurora/scheduler/state/StateManager.java    |  14 +-
 .../scheduler/state/StateManagerImpl.java       | 402 +++++++++----------
 .../aurora/scheduler/state/TaskAssigner.java    |   9 +-
 .../thrift/SchedulerThriftInterface.java        | 103 +++--
 .../scheduler/updater/InstanceAction.java       |   3 +
 .../updater/InstanceActionHandler.java          |  18 +-
 .../updater/JobUpdateControllerImpl.java        |  68 ++--
 .../aurora/scheduler/UserTaskLauncherTest.java  |   9 +-
 .../scheduler/async/PreemptorImplTest.java      |   1 +
 .../scheduler/async/TaskHistoryPrunerTest.java  |   6 +-
 .../scheduler/async/TaskSchedulerImplTest.java  |  12 +-
 .../scheduler/async/TaskSchedulerTest.java      |  46 ++-
 .../scheduler/async/TaskThrottlerTest.java      |  12 +-
 .../aurora/scheduler/async/TaskTimeoutTest.java |  16 +-
 .../cron/quartz/AuroraCronJobTest.java          |  24 +-
 .../state/MaintenanceControllerImplTest.java    |   1 +
 .../scheduler/state/StateManagerImplTest.java   |  76 +++-
 .../scheduler/state/TaskAssignerImplTest.java   |  12 +-
 .../thrift/SchedulerThriftInterfaceTest.java    |  22 +-
 .../aurora/scheduler/updater/AddTaskTest.java   |   9 +-
 .../aurora/scheduler/updater/JobUpdaterIT.java  |  49 ++-
 29 files changed, 695 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index e1b7d05..80f1d83 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.async.OfferQueue;
 import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.TaskStatus;
 
@@ -45,11 +46,13 @@ class UserTaskLauncher implements TaskLauncher {
   @VisibleForTesting
   static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
 
+  private final Storage storage;
   private final OfferQueue offerQueue;
   private final StateManager stateManager;
 
   @Inject
-  UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
+  UserTaskLauncher(Storage storage, OfferQueue offerQueue, StateManager stateManager) {
+    this.storage = requireNonNull(storage);
     this.offerQueue = requireNonNull(offerQueue);
     this.stateManager = requireNonNull(stateManager);
   }
@@ -63,14 +66,14 @@ class UserTaskLauncher implements TaskLauncher {
   }
 
   @Override
-  public synchronized boolean statusUpdate(TaskStatus status) {
+  public synchronized boolean statusUpdate(final TaskStatus status) {
     @Nullable String message = null;
     if (status.hasMessage()) {
       message = status.getMessage();
     }
 
     try {
-      ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+      final ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
       // TODO(William Farner): Remove this hack once Mesos API change is done.
       //                       Tracked by: https://issues.apache.org/jira/browse/MESOS-343
       if (translatedState == ScheduleStatus.FAILED
@@ -79,11 +82,18 @@ class UserTaskLauncher implements TaskLauncher {
         message = MEMORY_LIMIT_DISPLAY;
       }
 
-      stateManager.changeState(
-          status.getTaskId().getValue(),
-          Optional.<ScheduleStatus>absent(),
-          translatedState,
-          Optional.fromNullable(message));
+      final String auditMessage = message;
+      storage.write(new Storage.MutateWork.NoResult.Quiet() {
+        @Override
+        protected void execute(Storage.MutableStoreProvider storeProvider) {
+          stateManager.changeState(
+              storeProvider,
+              status.getTaskId().getValue(),
+              Optional.<ScheduleStatus>absent(),
+              translatedState,
+              Optional.fromNullable(auditMessage));
+        }
+      });
     } catch (SchedulerException e) {
       LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
       throw e;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index 1d337f6..ff26c49 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -65,6 +65,8 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
 import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import static org.apache.aurora.scheduler.storage.Storage.Work;
 
@@ -351,7 +353,7 @@ public interface Preemptor {
         return Optional.absent();
       }
 
-      IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
+      final IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
 
       Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
 
@@ -371,21 +373,27 @@ public interface Preemptor {
           .build();
 
       for (String slaveID : allSlaves) {
-        Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
+        final Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
             slavesToActiveTasks.get(slaveID),
             slavesToOffers.get(slaveID),
             pendingTask,
             attributeAggregate);
 
         if (toPreemptTasks.isPresent()) {
-          for (IAssignedTask toPreempt : toPreemptTasks.get()) {
-            stateManager.changeState(
-                toPreempt.getTaskId(),
-                Optional.<ScheduleStatus>absent(),
-                PREEMPTING,
-                Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
-            tasksPreempted.incrementAndGet();
-          }
+          storage.write(new MutateWork.NoResult.Quiet() {
+            @Override
+            protected void execute(MutableStoreProvider storeProvider) {
+              for (IAssignedTask toPreempt : toPreemptTasks.get()) {
+                stateManager.changeState(
+                    storeProvider,
+                    toPreempt.getTaskId(),
+                    Optional.<ScheduleStatus>absent(),
+                    PREEMPTING,
+                    Optional.of("Preempting in favor of " + pendingTask.getTaskId()));
+                tasksPreempted.incrementAndGet();
+              }
+            }
+          });
           return Optional.of(slaveID);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
index 345cd89..58d074b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -121,9 +121,14 @@ public class TaskHistoryPruner implements EventSubscriber {
     }
   }
 
-  private void deleteTasks(Set<String> taskIds) {
+  private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
-    stateManager.deleteTasks(taskIds);
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        stateManager.deleteTasks(storeProvider, taskIds);
+      }
+    });
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index e2ba8b8..6bfa3ac 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -128,10 +128,14 @@ public interface TaskScheduler extends EventSubscriber {
     }
 
     private Function<HostOffer, Optional<TaskInfo>> getAssignerFunction(
+        final MutableStoreProvider storeProvider,
         final AttributeAggregate attributeAggregate,
         final String taskId,
         final IScheduledTask task) {
 
+      // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match
+      // and perform the assignment at the very end.  This will allow us to use optimistic locking
+      // at the top of the stack and avoid holding the write lock for too long.
       return new Function<HostOffer, Optional<TaskInfo>>() {
         @Override
         public Optional<TaskInfo> apply(HostOffer offer) {
@@ -140,14 +144,14 @@ public interface TaskScheduler extends EventSubscriber {
           if (reservedTaskId.isPresent()) {
             if (taskId.equals(reservedTaskId.get())) {
               // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task, attributeAggregate);
+              return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
             } else {
               // Slave is reserved for another task.
               return Optional.absent();
             }
           } else {
             // Slave is not reserved.
-            return assigner.maybeAssign(offer, task, attributeAggregate);
+            return assigner.maybeAssign(storeProvider, offer, task, attributeAggregate);
           }
         }
       };
@@ -194,7 +198,7 @@ public interface TaskScheduler extends EventSubscriber {
               AttributeAggregate aggregate =
                   getJobState(store, Tasks.SCHEDULED_TO_JOB_KEY.apply(task));
               try {
-                if (!offerQueue.launchFirst(getAssignerFunction(aggregate, taskId, task))) {
+                if (!offerQueue.launchFirst(getAssignerFunction(store, aggregate, taskId, task))) {
                   // Task could not be scheduled.
                   maybePreemptFor(taskId, aggregate);
                   attemptsNoMatch.incrementAndGet();
@@ -209,7 +213,12 @@ public interface TaskScheduler extends EventSubscriber {
                 // It is in the LOST state and a new task will move to PENDING to replace it.
                 // Should the state change fail due to storage issues, that's okay.  The task will
                 // time out in the ASSIGNED state and be moved to LOST.
-                stateManager.changeState(taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG);
+                stateManager.changeState(
+                    store,
+                    taskId,
+                    Optional.of(PENDING),
+                    LOST,
+                    LAUNCH_FAILED_MSG);
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
index ca6129c..f0dea48 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java
@@ -27,6 +27,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;
 
@@ -44,6 +45,7 @@ class TaskThrottler implements EventSubscriber {
   private final RescheduleCalculator rescheduleCalculator;
   private final Clock clock;
   private final ScheduledExecutorService executor;
+  private final Storage storage;
   private final StateManager stateManager;
 
   private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
@@ -53,11 +55,13 @@ class TaskThrottler implements EventSubscriber {
       RescheduleCalculator rescheduleCalculator,
       Clock clock,
       ScheduledExecutorService executor,
+      Storage storage,
       StateManager stateManager) {
 
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
     this.clock = requireNonNull(clock);
     this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
   }
 
@@ -72,11 +76,17 @@ class TaskThrottler implements EventSubscriber {
           new Runnable() {
             @Override
             public void run() {
-              stateManager.changeState(
-                  stateChange.getTaskId(),
-                  Optional.of(THROTTLED),
-                  PENDING,
-                  Optional.<String>absent());
+              storage.write(new Storage.MutateWork.NoResult.Quiet() {
+                @Override
+                protected void execute(Storage.MutableStoreProvider storeProvider) {
+                  stateManager.changeState(
+                      storeProvider,
+                      stateChange.getTaskId(),
+                      Optional.of(THROTTLED),
+                      PENDING,
+                      Optional.<String>absent());
+                }
+              });
             }
           },
           delayMs,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 8217c51..90e6149 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -33,9 +33,12 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
+
 /**
  * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
  * tasks will be transitioned to the LOST state.
@@ -61,6 +64,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
       ScheduleStatus.DRAINING);
 
   private final ScheduledExecutorService executor;
+  private final Storage storage;
   private final StateManager stateManager;
   private final Amount<Long, Time> timeout;
   private final AtomicLong timedOutTasks;
@@ -68,11 +72,13 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
   @Inject
   TaskTimeout(
       ScheduledExecutorService executor,
+      Storage storage,
       StateManager stateManager,
       Amount<Long, Time> timeout,
       StatsProvider statsProvider) {
 
     this.executor = requireNonNull(executor);
+    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
     this.timeout = requireNonNull(timeout);
     this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
@@ -93,41 +99,56 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
     // Nothing to do for shutting down.
   }
 
+  private class TimedOutTaskHandler implements Runnable {
+    private final String taskId;
+    private final ScheduleStatus newState;
+
+    TimedOutTaskHandler(String taskId, ScheduleStatus newState) {
+      this.taskId = taskId;
+      this.newState = newState;
+    }
+
+    @Override
+    public void run() {
+      if (isRunning()) {
+        // This query acts as a CAS by including the state that we expect the task to be in
+        // if the timeout is still valid.  Ideally, the future would have already been
+        // canceled, but in the event of a state transition race, including transientState
+        // prevents an unintended task timeout.
+        // Note: This requires LOST transitions trigger Driver.killTask.
+        boolean movedToLost = storage.write(new MutateWork.Quiet<Boolean>() {
+          @Override
+          public Boolean apply(Storage.MutableStoreProvider storeProvider) {
+            return stateManager.changeState(
+                storeProvider,
+                taskId,
+                Optional.of(newState),
+                ScheduleStatus.LOST,
+                TIMEOUT_MESSAGE);
+          }
+        });
+
+        if (movedToLost) {
+          LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+          timedOutTasks.incrementAndGet();
+        }
+      } else {
+        // Our service is not yet started.  We don't want to lose track of the task, so
+        // we will try again later.
+        LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
+        executor.schedule(
+            this,
+            NOT_STARTED_RETRY.getValue(),
+            NOT_STARTED_RETRY.getUnit().getTimeUnit());
+      }
+    }
+  }
+
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
-    final String taskId = change.getTaskId();
-    final ScheduleStatus newState = change.getNewState();
-    if (isTransient(newState)) {
+    if (isTransient(change.getNewState())) {
       executor.schedule(
-          new Runnable() {
-            @Override
-            public void run() {
-              if (isRunning()) {
-                // This query acts as a CAS by including the state that we expect the task to be in
-                // if the timeout is still valid.  Ideally, the future would have already been
-                // canceled, but in the event of a state transition race, including transientState
-                // prevents an unintended task timeout.
-                // Note: This requires LOST transitions trigger Driver.killTask.
-                if (stateManager.changeState(
-                    taskId,
-                    Optional.of(newState),
-                    ScheduleStatus.LOST,
-                    TIMEOUT_MESSAGE)) {
-
-                  LOG.info("Timeout reached for task " + taskId + ":" + taskId);
-                  timedOutTasks.incrementAndGet();
-                }
-              } else {
-                // Our service is not yet started.  We don't want to lose track of the task, so
-                // we will try again later.
-                LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
-                executor.schedule(
-                    this,
-                    NOT_STARTED_RETRY.getValue(),
-                    NOT_STARTED_RETRY.getUnit().getTimeUnit());
-              }
-            }
-          },
+          new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
           timeout.getValue(),
           timeout.getUnit().getTimeUnit());
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index 9388657..84e37e4 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -152,7 +152,7 @@ class AuroraCronJob implements Job {
             ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
             Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
             if (activeTasks.isEmpty()) {
-              stateManager.insertPendingTasks(task, instanceIds);
+              stateManager.insertPendingTasks(storeProvider, task, instanceIds);
 
               return Optional.absent();
             }
@@ -182,13 +182,20 @@ class AuroraCronJob implements Job {
       return;
     }
 
-    for (String taskId : deferredLaunch.get().activeTaskIds) {
-      stateManager.changeState(
-          taskId,
-          Optional.<ScheduleStatus>absent(),
-          KILLING,
-          KILL_AUDIT_MESSAGE);
-    }
+    storage.write(new Storage.MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider) {
+        for (String taskId : deferredLaunch.get().activeTaskIds) {
+          stateManager.changeState(
+              storeProvider,
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              KILLING,
+              KILL_AUDIT_MESSAGE);
+        }
+      }
+    });
+
     LOG.info(String.format("Waiting for job to terminate before launching cron job %s.", path));
 
     final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
@@ -200,9 +207,16 @@ class AuroraCronJob implements Job {
         public Boolean get() {
           if (Storage.Util.consistentFetchTasks(storage, query).isEmpty()) {
             LOG.info("Initiating delayed launch of cron " + path);
-            stateManager.insertPendingTasks(
-                deferredLaunch.get().task,
-                deferredLaunch.get().instanceIds);
+            storage.write(new Storage.MutateWork.NoResult.Quiet() {
+              @Override
+              protected void execute(Storage.MutableStoreProvider storeProvider) {
+                stateManager.insertPendingTasks(
+                    storeProvider,
+                    deferredLaunch.get().task,
+                    deferredLaunch.get().instanceIds);
+              }
+            });
+
             return true;
           } else {
             LOG.info("Not yet safe to run cron " + path);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 86440eb..a835eaa 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -127,6 +127,7 @@ public interface MaintenanceController {
           LOG.info(String.format("Draining tasks: %s on host: %s", activeTasks, host));
           for (String taskId : activeTasks) {
             stateManager.changeState(
+                store,
                 taskId,
                 Optional.<ScheduleStatus>absent(),
                 ScheduleStatus.DRAINING,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
index 3a2fd27..50ff4ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManager.java
@@ -22,6 +22,8 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
 /**
  * A manager for the state of tasks.  Most modifications to tasks should be made here, especially
  * those that alter the {@link ScheduleStatus} of tasks.
@@ -47,6 +49,7 @@ public interface StateManager {
    *
    */
   boolean changeState(
+      MutableStoreProvider storeProvider,
       String taskId,
       Optional<ScheduleStatus> casState,
       ScheduleStatus newState,
@@ -56,6 +59,7 @@ public interface StateManager {
    * Assigns a task to a specific slave.
    * This will modify the task record to reflect the host assignment and return the updated record.
    *
+   * @param storeProvider Storage provider.
    * @param taskId ID of the task to mutate.
    * @param slaveHost Host name that the task is being assigned to.
    * @param slaveId ID of the slave that the task is being assigned to.
@@ -63,6 +67,7 @@ public interface StateManager {
    * @return The updated task record, or {@code null} if the task was not found.
    */
   IAssignedTask assignTask(
+      MutableStoreProvider storeProvider,
       String taskId,
       String slaveHost,
       SlaveID slaveId,
@@ -72,17 +77,22 @@ public interface StateManager {
    * Inserts pending instances using {@code task} as their configuration. Tasks will immediately
    * move into PENDING and will be eligible for scheduling.
    *
+   * @param storeProvider Storage provider.
    * @param task Task template.
    * @param instanceIds Instance IDs to assign to new PENDING tasks.
    */
-  void insertPendingTasks(ITaskConfig task, Set<Integer> instanceIds);
+  void insertPendingTasks(
+      MutableStoreProvider storeProvider,
+      ITaskConfig task,
+      Set<Integer> instanceIds);
 
   /**
    * Attempts to delete tasks from the task store.
    * If the task is not currently in a state that is considered safe for deletion,
    * side-effect actions will be performed to reconcile the state conflict.
    *
+   * @param storeProvider Storage provider.
    * @param taskIds IDs of tasks to delete.
    */
-  void deleteTasks(final Set<String> taskIds);
+  void deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6663555..bd6a05f 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -53,9 +53,7 @@ import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.state.SideEffect.Action;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -79,7 +77,6 @@ import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  private final Storage storage;
   private final Clock clock;
   private final Driver driver;
   private final TaskIdGenerator taskIdGenerator;
@@ -88,14 +85,12 @@ public class StateManagerImpl implements StateManager {
 
   @Inject
   StateManagerImpl(
-      final Storage storage,
       final Clock clock,
       Driver driver,
       TaskIdGenerator taskIdGenerator,
       EventSink eventSink,
       RescheduleCalculator rescheduleCalculator) {
 
-    this.storage = requireNonNull(storage);
     this.clock = requireNonNull(clock);
     this.driver = requireNonNull(driver);
     this.taskIdGenerator = requireNonNull(taskIdGenerator);
@@ -114,12 +109,17 @@ public class StateManagerImpl implements StateManager {
   }
 
   @Override
-  public void insertPendingTasks(final ITaskConfig task, final Set<Integer> instanceIds) {
+  public void insertPendingTasks(
+      MutableStoreProvider storeProvider,
+      final ITaskConfig task,
+      Set<Integer> instanceIds) {
+
+    requireNonNull(storeProvider);
     requireNonNull(task);
     checkNotBlank(instanceIds);
 
     // Done outside the write transaction to minimize the work done inside a transaction.
-    final Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
+    Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
         .transform(new Function<Integer, IScheduledTask>() {
           @Override
           public IScheduledTask apply(Integer instanceId) {
@@ -127,45 +127,48 @@ public class StateManagerImpl implements StateManager {
           }
         }).toSet();
 
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) {
-          ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
-            Query.jobScoped(task.getJob()).active());
+    ImmutableSet<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
+        Query.jobScoped(task.getJob()).active());
 
-        Set<Integer> existingInstanceIds =
-            FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
+    Set<Integer> existingInstanceIds =
+        FluentIterable.from(existingTasks).transform(Tasks.SCHEDULED_TO_INSTANCE_ID).toSet();
 
-        if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
-          throw new IllegalArgumentException("Instance ID collision detected.");
-        }
+    if (!Sets.intersection(existingInstanceIds, instanceIds).isEmpty()) {
+      throw new IllegalArgumentException("Instance ID collision detected.");
+    }
 
-        storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
+    storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
-        for (IScheduledTask task : scheduledTasks) {
-          updateTaskAndExternalState(
-              Tasks.id(task),
-              Optional.of(task),
-              Optional.of(PENDING),
-              Optional.<String>absent());
-        }
-      }
-    });
+    for (IScheduledTask scheduledTask : scheduledTasks) {
+      updateTaskAndExternalState(
+          storeProvider.getUnsafeTaskStore(),
+          Tasks.id(scheduledTask),
+          Optional.of(scheduledTask),
+          Optional.of(PENDING),
+          Optional.<String>absent());
+    }
   }
 
   @Override
   public boolean changeState(
+      MutableStoreProvider storeProvider,
       String taskId,
       Optional<ScheduleStatus> casState,
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
+    return updateTaskAndExternalState(
+        storeProvider.getUnsafeTaskStore(),
+        casState,
+        taskId,
+        newState,
+        auditMessage);
   }
 
   @Override
   public IAssignedTask assignTask(
-      final String taskId,
+      MutableStoreProvider storeProvider,
+      String taskId,
       final String slaveHost,
       final SlaveID slaveId,
       final Set<Integer> assignedPorts) {
@@ -175,39 +178,35 @@ public class StateManagerImpl implements StateManager {
     requireNonNull(slaveId);
     requireNonNull(assignedPorts);
 
-    return storage.write(new MutateWork.Quiet<IAssignedTask>() {
-      @Override
-      public IAssignedTask apply(MutableStoreProvider storeProvider) {
-        boolean success = updateTaskAndExternalState(
-            Optional.<ScheduleStatus>absent(),
-            taskId,
-            ASSIGNED,
-            Optional.<String>absent());
-
-        Preconditions.checkState(
-            success,
-            "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
-        Query.Builder query = Query.taskScoped(taskId);
-        storeProvider.getUnsafeTaskStore().mutateTasks(query,
-            new Function<IScheduledTask, IScheduledTask>() {
-              @Override
-              public IScheduledTask apply(IScheduledTask task) {
-                ScheduledTask builder = task.newBuilder();
-                AssignedTask assigned = builder.getAssignedTask();
-                assigned.setAssignedPorts(
-                    getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-                assigned.setSlaveHost(slaveHost)
-                    .setSlaveId(slaveId.getValue());
-                return IScheduledTask.build(builder);
-              }
-            });
-
-        return Iterables.getOnlyElement(
-            Iterables.transform(
-                storeProvider.getTaskStore().fetchTasks(query),
-                Tasks.SCHEDULED_TO_ASSIGNED));
-      }
-    });
+    boolean success = updateTaskAndExternalState(
+        storeProvider.getUnsafeTaskStore(),
+        Optional.<ScheduleStatus>absent(),
+        taskId,
+        ASSIGNED,
+        Optional.<String>absent());
+
+    Preconditions.checkState(
+        success,
+        "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+    Query.Builder query = Query.taskScoped(taskId);
+    storeProvider.getUnsafeTaskStore().mutateTasks(query,
+        new Function<IScheduledTask, IScheduledTask>() {
+          @Override
+          public IScheduledTask apply(IScheduledTask task) {
+            ScheduledTask builder = task.newBuilder();
+            AssignedTask assigned = builder.getAssignedTask();
+            assigned.setAssignedPorts(
+                getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+            assigned.setSlaveHost(slaveHost)
+                .setSlaveId(slaveId.getValue());
+            return IScheduledTask.build(builder);
+          }
+        });
+
+    return Iterables.getOnlyElement(
+        Iterables.transform(
+            storeProvider.getTaskStore().fetchTasks(query),
+            Tasks.SCHEDULED_TO_ASSIGNED));
   }
 
   private static Map<String, Integer> getNameMappedPorts(
@@ -250,32 +249,29 @@ public class StateManagerImpl implements StateManager {
       });
 
   private boolean updateTaskAndExternalState(
-      final Optional<ScheduleStatus> casState,
-      final String taskId,
-      final ScheduleStatus targetState,
-      final Optional<String> transitionMessage) {
+      TaskStore.Mutable taskStore,
+      Optional<ScheduleStatus> casState,
+      String taskId,
+      ScheduleStatus targetState,
+      Optional<String> transitionMessage) {
 
-    return storage.write(new MutateWork.Quiet<Boolean>() {
-      @Override
-      public Boolean apply(MutableStoreProvider storeProvider) {
-        Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-            null));
+    Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+        taskStore.fetchTasks(Query.taskScoped(taskId)),
+        null));
 
-        // CAS operation fails if the task does not exist, or the states don't match.
-        if (casState.isPresent()
-            && (!task.isPresent() || casState.get() != task.get().getStatus())) {
+    // CAS operation fails if the task does not exist, or the states don't match.
+    if (casState.isPresent()
+        && (!task.isPresent() || casState.get() != task.get().getStatus())) {
 
-          return false;
-        }
+      return false;
+    }
 
-        return updateTaskAndExternalState(
-            taskId,
-            task,
-            Optional.of(targetState),
-            transitionMessage);
-      }
-    });
+    return updateTaskAndExternalState(
+        taskStore,
+        taskId,
+        task,
+        Optional.of(targetState),
+        transitionMessage);
   }
 
   private static final Function<SideEffect, Action> GET_ACTION =
@@ -308,7 +304,8 @@ public class StateManagerImpl implements StateManager {
       Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
 
   private boolean updateTaskAndExternalState(
-      final String taskId,
+      TaskStore.Mutable taskStore,
+      String taskId,
       // Note: This argument is deliberately non-final, and should not be made final.
       // This is because using the captured value within the storage operation below is
       // highly-risky, since it doesn't necessarily represent the value in storage.
@@ -327,111 +324,106 @@ public class StateManagerImpl implements StateManager {
         ? new TaskStateMachine(task.get())
         : new TaskStateMachine(taskId);
 
-    boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
-      @Override
-      public Boolean apply(MutableStoreProvider storeProvider) {
-        TransitionResult result = stateMachine.updateState(targetState);
-        Query.Builder query = Query.taskScoped(taskId);
-
-        for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
-          Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
-              Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
-
-          switch (sideEffect.getAction()) {
-            case INCREMENT_FAILURES:
-              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
-                @Override
-                public IScheduledTask apply(IScheduledTask task) {
-                  return IScheduledTask.build(
-                      task.newBuilder().setFailureCount(task.getFailureCount() + 1));
-                }
-              });
-              break;
-
-            case SAVE_STATE:
-              Preconditions.checkState(
-                  upToDateTask.isPresent(),
-                  "Operation expected task " + taskId + " to be present.");
-
-              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
-                @Override
-                public IScheduledTask apply(IScheduledTask task) {
-                  ScheduledTask mutableTask = task.newBuilder();
-                  mutableTask.setStatus(targetState.get());
-                  mutableTask.addToTaskEvents(new TaskEvent()
-                      .setTimestamp(clock.nowMillis())
-                      .setStatus(targetState.get())
-                      .setMessage(transitionMessage.orNull())
-                      .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-                  return IScheduledTask.build(mutableTask);
-                }
-              });
-              events.add(
-                  PubsubEvent.TaskStateChange.transition(
-                      Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
-                      stateMachine.getPreviousState()));
-              break;
-
-            case STATE_CHANGE:
-              updateTaskAndExternalState(
-                  Optional.<ScheduleStatus>absent(),
-                  taskId,
-                  sideEffect.getNextState().get(),
-                  Optional.<String>absent());
-              break;
-
-            case RESCHEDULE:
-              Preconditions.checkState(
-                  upToDateTask.isPresent(),
-                  "Operation expected task " + taskId + " to be present.");
-              LOG.info("Task being rescheduled: " + taskId);
-
-              ScheduleStatus newState;
-              String auditMessage;
-              long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
-              if (flapPenaltyMs > 0) {
-                newState = THROTTLED;
-                auditMessage =
-                    String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
-              } else {
-                newState = PENDING;
-                auditMessage = "Rescheduled";
-              }
-
-              IScheduledTask newTask = IScheduledTask.build(createTask(
-                  upToDateTask.get().getAssignedTask().getInstanceId(),
-                  upToDateTask.get().getAssignedTask().getTask())
-                  .newBuilder()
-                  .setFailureCount(upToDateTask.get().getFailureCount())
-                  .setAncestorId(taskId));
-              storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
-              updateTaskAndExternalState(
-                  Tasks.id(newTask),
-                  Optional.of(newTask),
-                  Optional.of(newState),
-                  Optional.of(auditMessage));
-              break;
-
-            case KILL:
-              driver.killTask(taskId);
-              break;
-
-            case DELETE:
-              Preconditions.checkState(
-                  upToDateTask.isPresent(),
-                  "Operation expected task " + taskId + " to be present.");
-
-              events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
-              break;
-
-            default:
-              throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+    TransitionResult result = stateMachine.updateState(targetState);
+    Query.Builder query = Query.taskScoped(taskId);
+
+    for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+      Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+          Iterables.getOnlyElement(taskStore.fetchTasks(query), null));
+
+      switch (sideEffect.getAction()) {
+        case INCREMENT_FAILURES:
+          taskStore.mutateTasks(query, new TaskMutation() {
+            @Override
+            public IScheduledTask apply(IScheduledTask task) {
+              return IScheduledTask.build(
+                  task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+            }
+          });
+          break;
+
+        case SAVE_STATE:
+          Preconditions.checkState(
+              upToDateTask.isPresent(),
+              "Operation expected task " + taskId + " to be present.");
+
+          taskStore.mutateTasks(query, new TaskMutation() {
+            @Override
+            public IScheduledTask apply(IScheduledTask task) {
+              ScheduledTask mutableTask = task.newBuilder();
+              mutableTask.setStatus(targetState.get());
+              mutableTask.addToTaskEvents(new TaskEvent()
+                  .setTimestamp(clock.nowMillis())
+                  .setStatus(targetState.get())
+                  .setMessage(transitionMessage.orNull())
+                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+              return IScheduledTask.build(mutableTask);
+            }
+          });
+          events.add(
+              PubsubEvent.TaskStateChange.transition(
+                  Iterables.getOnlyElement(taskStore.fetchTasks(query)),
+                  stateMachine.getPreviousState()));
+          break;
+
+        case STATE_CHANGE:
+          updateTaskAndExternalState(
+              taskStore,
+              Optional.<ScheduleStatus>absent(),
+              taskId,
+              sideEffect.getNextState().get(),
+              Optional.<String>absent());
+          break;
+
+        case RESCHEDULE:
+          Preconditions.checkState(
+              upToDateTask.isPresent(),
+              "Operation expected task " + taskId + " to be present.");
+          LOG.info("Task being rescheduled: " + taskId);
+
+          ScheduleStatus newState;
+          String auditMessage;
+          long flapPenaltyMs = rescheduleCalculator.getFlappingPenaltyMs(upToDateTask.get());
+          if (flapPenaltyMs > 0) {
+            newState = THROTTLED;
+            auditMessage =
+                String.format("Rescheduled, penalized for %s ms for flapping", flapPenaltyMs);
+          } else {
+            newState = PENDING;
+            auditMessage = "Rescheduled";
           }
-        }
 
-        return result.isSuccess();
+          IScheduledTask newTask = IScheduledTask.build(createTask(
+              upToDateTask.get().getAssignedTask().getInstanceId(),
+              upToDateTask.get().getAssignedTask().getTask())
+              .newBuilder()
+              .setFailureCount(upToDateTask.get().getFailureCount())
+              .setAncestorId(taskId));
+          taskStore.saveTasks(ImmutableSet.of(newTask));
+          updateTaskAndExternalState(
+              taskStore,
+              Tasks.id(newTask),
+              Optional.of(newTask),
+              Optional.of(newState),
+              Optional.of(auditMessage));
+          break;
+
+        case KILL:
+          driver.killTask(taskId);
+          break;
+
+        case DELETE:
+          Preconditions.checkState(
+              upToDateTask.isPresent(),
+              "Operation expected task " + taskId + " to be present.");
+
+          events.add(deleteTasks(taskStore, ImmutableSet.of(taskId)));
+          break;
+
+        default:
+          throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
       }
-    });
+    }
 
     // Note (AURORA-138): Delaying events until after the write operation is somewhat futile, since
     // the state may actually not be written to durable store
@@ -442,32 +434,26 @@ public class StateManagerImpl implements StateManager {
       eventSink.post(event);
     }
 
-    return success;
+    return result.isSuccess();
   }
 
   @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      protected void execute(final MutableStoreProvider storeProvider) {
-
-        Map<String, IScheduledTask> tasks = Maps.uniqueIndex(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
-            Tasks.SCHEDULED_TO_ID);
-
-        for (Map.Entry<String, IScheduledTask> entry : tasks.entrySet()) {
-          updateTaskAndExternalState(
-              entry.getKey(),
-              Optional.of(entry.getValue()),
-              Optional.<ScheduleStatus>absent(),
-              Optional.<String>absent());
-        }
-      }
-    });
+  public void deleteTasks(MutableStoreProvider storeProvider, final Set<String> taskIds) {
+    Map<String, IScheduledTask> tasks = Maps.uniqueIndex(
+        storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)),
+        Tasks.SCHEDULED_TO_ID);
+
+    for (Map.Entry<String, IScheduledTask> entry : tasks.entrySet()) {
+      updateTaskAndExternalState(
+          storeProvider.getUnsafeTaskStore(),
+          entry.getKey(),
+          Optional.of(entry.getValue()),
+          Optional.<ScheduleStatus>absent(),
+          Optional.<String>absent());
+    }
   }
 
-  private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
-    TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+  private static PubsubEvent deleteTasks(TaskStore.Mutable taskStore, Set<String> taskIds) {
     Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
     taskStore.deleteTasks(taskIds);
     return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 77db411..4abc7ba 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -34,6 +34,7 @@ import org.apache.mesos.Protos.TaskInfo;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.mesos.Protos.Offer;
 
 /**
@@ -45,12 +46,14 @@ public interface TaskAssigner {
    * Tries to match a task against an offer.  If a match is found, the assigner should
    * make the appropriate changes to the task and provide a non-empty result.
    *
+   * @param storeProvider Storage provider.
    * @param offer The resource offer.
    * @param task The task to match against and optionally assign.
    * @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
    * @return Instructions for launching the task if matching and assignment were successful.
    */
   Optional<TaskInfo> maybeAssign(
+      MutableStoreProvider storeProvider,
       HostOffer offer,
       IScheduledTask task,
       AttributeAggregate attributeAggregate);
@@ -73,11 +76,12 @@ public interface TaskAssigner {
       this.taskFactory = requireNonNull(taskFactory);
     }
 
-    private TaskInfo assign(Offer offer, IScheduledTask task) {
+    private TaskInfo assign(MutableStoreProvider storeProvider, Offer offer, IScheduledTask task) {
       String host = offer.getHostname();
       Set<Integer> selectedPorts =
           Resources.getPorts(offer, task.getAssignedTask().getTask().getRequestedPorts().size());
       IAssignedTask assigned = stateManager.assignTask(
+          storeProvider,
           Tasks.id(task),
           host,
           offer.getSlaveId(),
@@ -89,6 +93,7 @@ public interface TaskAssigner {
 
     @Override
     public Optional<TaskInfo> maybeAssign(
+        MutableStoreProvider storeProvider,
         HostOffer offer,
         IScheduledTask task,
         AttributeAggregate attributeAggregate) {
@@ -100,7 +105,7 @@ public interface TaskAssigner {
           Tasks.id(task),
           attributeAggregate);
       if (vetoes.isEmpty()) {
-        return Optional.of(assign(offer.getOffer(), task));
+        return Optional.of(assign(storeProvider, offer.getOffer(), task));
       } else {
         LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
             + ": " + vetoes);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index f081bf3..f0b4975 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -320,7 +320,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
             cronJobManager.createJob(SanitizedCronJob.from(sanitized));
           } else {
             LOG.info("Launching " + count + " tasks.");
-            stateManager.insertPendingTasks(template, sanitized.getInstanceIds());
+            stateManager.insertPendingTasks(
+                storeProvider,
+                template,
+                sanitized.getInstanceIds());
           }
           return okEmptyResponse();
         } catch (LockException e) {
@@ -763,12 +766,14 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
         final Set<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);
 
-        Optional<SessionContext> context = isAdmin(session);
-        if (context.isPresent()) {
+        Optional<SessionContext> maybeAdminContext = isAdmin(session);
+        final SessionContext context;
+        if (maybeAdminContext.isPresent()) {
           LOG.info("Granting kill query to admin user: " + query);
+          context = maybeAdminContext.get();
         } else {
           try {
-            context = Optional.of(validateSessionKeyForTasks(session, query, tasks));
+            context = validateSessionKeyForTasks(session, query, tasks);
           } catch (AuthFailedException e) {
             return errorResponse(AUTH_FAILED, e);
           }
@@ -784,7 +789,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
         LOG.info("Killing tasks matching " + query);
 
-        boolean tasksKilled = false;
+        final boolean cronJobKilled;
         if (isSingleJobScoped) {
           // If this looks like a query for all tasks in a job, instruct the cron
           // scheduler to delete it.
@@ -792,19 +797,30 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           IJobKey jobKey = Iterables.getOnlyElement(JobKeys.from(query).get());
           LOG.warning("Deprecated behavior: descheduling job " + jobKey
               + " with cron via killTasks. (See AURORA-454)");
-          tasksKilled = cronJobManager.deleteJob(jobKey);
+          cronJobKilled = cronJobManager.deleteJob(jobKey);
+        } else {
+          cronJobKilled = false;
         }
 
-        for (String taskId : Tasks.ids(tasks)) {
-          tasksKilled |= stateManager.changeState(
-              taskId,
-              Optional.<ScheduleStatus>absent(),
-              ScheduleStatus.KILLING,
-              killedByMessage(context.get().getIdentity()));
-        }
+        final boolean tasksKilled = storage.write(new MutateWork.Quiet<Boolean>() {
+          @Override
+          public Boolean apply(MutableStoreProvider storeProvider) {
+            boolean match = false;
+            for (String taskId : Tasks.ids(tasks)) {
+              match |= stateManager.changeState(
+                  storeProvider,
+                  taskId,
+                  Optional.<ScheduleStatus>absent(),
+                  ScheduleStatus.KILLING,
+                  killedByMessage(context.getIdentity()));
+            }
+            return match;
+          }
+        });
 
-        return tasksKilled
-            ? okEmptyResponse() : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
+        return cronJobKilled || tasksKilled
+            ? okEmptyResponse()
+            : addMessage(emptyResponse(), OK, NO_TASKS_TO_KILL_MESSAGE);
       }
     });
   }
@@ -839,19 +855,25 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
         }
 
         Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
-        Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+        final Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
         if (matchingTasks.size() != shardIds.size()) {
           return invalidResponse("Not all requested shards are active.");
         }
 
         LOG.info("Restarting shards matching " + query);
-        for (String taskId : Tasks.ids(matchingTasks)) {
-          stateManager.changeState(
-              taskId,
-              Optional.<ScheduleStatus>absent(),
-              ScheduleStatus.RESTARTING,
-              restartedByMessage(context.getIdentity()));
-        }
+        storage.write(new MutateWork.NoResult.Quiet() {
+          @Override
+          protected void execute(MutableStoreProvider storeProvider) {
+            for (String taskId : Tasks.ids(matchingTasks)) {
+              stateManager.changeState(
+                  storeProvider,
+                  taskId,
+                  Optional.<ScheduleStatus>absent(),
+                  ScheduleStatus.RESTARTING,
+                  restartedByMessage(context.getIdentity()));
+            }
+          }
+        });
         return okEmptyResponse();
       }
     });
@@ -919,12 +941,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response forceTaskState(String taskId, ScheduleStatus status, SessionKey session) {
+  public Response forceTaskState(
+      final String taskId,
+      final ScheduleStatus status,
+      SessionKey session) {
+
     checkNotBlank(taskId);
     requireNonNull(status);
     requireNonNull(session);
 
-    SessionContext context;
+    final SessionContext context;
     try {
       // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
       context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
@@ -932,11 +958,17 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return errorResponse(AUTH_FAILED, e);
     }
 
-    stateManager.changeState(
-        taskId,
-        Optional.<ScheduleStatus>absent(),
-        status,
-        transitionMessage(context.getIdentity()));
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        stateManager.changeState(
+            storeProvider,
+            taskId,
+            Optional.<ScheduleStatus>absent(),
+            status,
+            transitionMessage(context.getIdentity()));
+      }
+    });
 
     return okEmptyResponse();
   }
@@ -1204,7 +1236,16 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
               currentTasks.size() + config.getInstanceIdsSize(),
               quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize()));
 
-          stateManager.insertPendingTasks(task, ImmutableSet.copyOf(config.getInstanceIds()));
+          storage.write(new NoResult.Quiet() {
+            @Override
+            protected void execute(MutableStoreProvider storeProvider) {
+              stateManager.insertPendingTasks(
+                  storeProvider,
+                  task,
+                  ImmutableSet.copyOf(config.getInstanceIds()));
+            }
+          });
+
           return okEmptyResponse();
         } catch (LockException e) {
           return errorResponse(LOCK_ERROR, e);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
index 3774c85..b553f97 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceAction.java
@@ -21,6 +21,9 @@ import static org.apache.aurora.scheduler.updater.InstanceActionHandler.WatchRun
 
 enum InstanceAction {
   KILL_TASK(Optional.<InstanceActionHandler>of(new KillTask())),
+  // TODO(wfarner): Build this action into the scheduler state machine instead.  Rather than
+  // killing a task and re-recreating it, choose the updated or rolled-back task when we are
+  // deciding to reschedule the task.
   ADD_TASK(Optional.<InstanceActionHandler>of(new AddTask())),
   WATCH_TASK(Optional.<InstanceActionHandler>of(new WatchRunningTask())),
   AWAIT_STATE_CHANGE(Optional.<InstanceActionHandler>absent());

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
index f4363aa..3b9919d 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/InstanceActionHandler.java
@@ -27,7 +27,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
 import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
@@ -35,13 +34,14 @@ import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 
 interface InstanceActionHandler {
 
   Amount<Long, Time> getReevaluationDelay(
       IInstanceKey instance,
       IJobUpdateInstructions instructions,
-      TaskStore taskStore,
+      MutableStoreProvider storeProvider,
       StateManager stateManager,
       JobUpdateStatus status);
 
@@ -73,7 +73,7 @@ interface InstanceActionHandler {
     public Amount<Long, Time> getReevaluationDelay(
         IInstanceKey instance,
         IJobUpdateInstructions instructions,
-        TaskStore taskStore,
+        MutableStoreProvider storeProvider,
         StateManager stateManager,
         JobUpdateStatus status) {
 
@@ -82,7 +82,10 @@ interface InstanceActionHandler {
           instructions,
           status == ROLLING_FORWARD,
           instance.getInstanceId());
-      stateManager.insertPendingTasks(replacement, ImmutableSet.of(instance.getInstanceId()));
+      stateManager.insertPendingTasks(
+          storeProvider,
+          replacement,
+          ImmutableSet.of(instance.getInstanceId()));
       return  Amount.of(
           (long) instructions.getSettings().getMaxWaitToInstanceRunningMs(),
           Time.MILLISECONDS);
@@ -94,14 +97,15 @@ interface InstanceActionHandler {
     public Amount<Long, Time> getReevaluationDelay(
         IInstanceKey instance,
         IJobUpdateInstructions instructions,
-        TaskStore taskStore,
+        MutableStoreProvider storeProvider,
         StateManager stateManager,
         JobUpdateStatus status) {
 
       String taskId = Tasks.id(Iterables.getOnlyElement(
-          taskStore.fetchTasks(Query.instanceScoped(instance).active())));
+          storeProvider.getTaskStore().fetchTasks(Query.instanceScoped(instance).active())));
       LOG.info("Killing " + instance + " while " + status);
       stateManager.changeState(
+          storeProvider,
           taskId,
           Optional.<ScheduleStatus>absent(),
           ScheduleStatus.KILLING,
@@ -117,7 +121,7 @@ interface InstanceActionHandler {
     public Amount<Long, Time> getReevaluationDelay(
         IInstanceKey instance,
         IJobUpdateInstructions instructions,
-        TaskStore taskStore,
+        MutableStoreProvider storeProvider,
         StateManager stateManager,
         JobUpdateStatus status) {
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index f918d15..a992938 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -72,6 +72,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction;
@@ -131,7 +132,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     storage.write(new MutateWork.NoResult<UpdateStateException>() {
       @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider)
+      protected void execute(MutableStoreProvider storeProvider)
           throws UpdateStateException {
 
         IJobUpdateSummary summary = update.getSummary();
@@ -159,8 +160,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
             Optional.of(requireNonNull(lock.getToken())));
 
         recordAndChangeJobUpdateStatus(
-            storeProvider.getJobUpdateStore(),
-            storeProvider.getTaskStore(),
+            storeProvider,
             summary.getUpdateId(),
             job,
             ROLLING_FORWARD,
@@ -197,14 +197,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
   public void systemResume() {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
+      protected void execute(MutableStoreProvider storeProvider) {
         for (IJobUpdateSummary summary
             : storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(ACTIVE_QUERY)) {
 
           LOG.info("Automatically resuming update " + JobKeys.canonicalString(summary.getJobKey()));
           changeJobUpdateStatus(
-              storeProvider.getJobUpdateStore(),
-              storeProvider.getTaskStore(),
+              storeProvider,
               summary.getUpdateId(),
               summary.getJobKey(),
               summary.getState().getStatus(),
@@ -234,15 +233,14 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
     storage.write(new MutateWork.NoResult.Quiet() {
       @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider) {
+      protected void execute(MutableStoreProvider storeProvider) {
         IJobKey job = instance.getJobKey();
         UpdateFactory.Update update = updates.get(job);
         if (update != null) {
           if (update.getUpdater().containsInstance(instance.getInstanceId())) {
             LOG.info("Forwarding task change for " + InstanceKeys.toString(instance));
             evaluateUpdater(
-                storeProvider.getJobUpdateStore(),
-                storeProvider.getTaskStore(),
+                storeProvider,
                 update,
                 getOnlyMatch(storeProvider.getJobUpdateStore(), queryByJob(job)),
                 ImmutableMap.of(instance.getInstanceId(), state));
@@ -291,7 +289,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     storage.write(new MutateWork.NoResult<UpdateStateException>() {
       @Override
-      protected void execute(Storage.MutableStoreProvider storeProvider)
+      protected void execute(MutableStoreProvider storeProvider)
           throws UpdateStateException {
 
         IJobUpdateSummary update = Iterables.getOnlyElement(
@@ -302,19 +300,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
         JobUpdateStatus status = update.getState().getStatus();
         JobUpdateStatus newStatus = requireNonNull(stateChange.apply(status));
-        changeUpdateStatus(
-            storeProvider.getJobUpdateStore(),
-            storeProvider.getTaskStore(),
-            update,
-            newStatus,
-            user);
+        changeUpdateStatus(storeProvider, update, newStatus, user);
       }
     });
   }
 
   private void changeUpdateStatus(
-      JobUpdateStore.Mutable updateStore,
-      TaskStore taskStore,
+      MutableStoreProvider storeProvider,
       IJobUpdateSummary updateSummary,
       JobUpdateStatus newStatus,
       Optional<String> user) {
@@ -325,8 +317,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     assertTransitionAllowed(updateSummary.getState().getStatus(), newStatus);
     recordAndChangeJobUpdateStatus(
-        updateStore,
-        taskStore,
+        storeProvider,
         updateSummary.getUpdateId(),
         updateSummary.getJobKey(),
         newStatus,
@@ -334,14 +325,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private void recordAndChangeJobUpdateStatus(
-      JobUpdateStore.Mutable updateStore,
-      TaskStore taskStore,
+      MutableStoreProvider storeProvider,
       String updateId,
       IJobKey job,
       JobUpdateStatus status,
       Optional<String> user) {
 
-    changeJobUpdateStatus(updateStore, taskStore, updateId, job, status, user, true);
+    changeJobUpdateStatus(storeProvider, updateId, job, status, user, true);
   }
 
   private static final Set<JobUpdateStatus> UNLOCK_STATES = ImmutableSet.of(
@@ -353,8 +343,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   );
 
   private void changeJobUpdateStatus(
-      JobUpdateStore.Mutable updateStore,
-      TaskStore taskStore,
+      MutableStoreProvider storeProvider,
       String updateId,
       IJobKey job,
       JobUpdateStatus newStatus,
@@ -364,6 +353,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     JobUpdateStatus status;
     boolean record;
 
+    JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
     Optional<String> updateLock = updateStore.getLockToken(updateId);
     if (updateLock.isPresent()) {
       status = newStatus;
@@ -407,13 +397,12 @@ class JobUpdateControllerImpl implements JobUpdateController {
         update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD);
       } catch (RuntimeException e) {
         LOG.log(Level.WARNING, "Uncaught exception: " + e, e);
-        changeJobUpdateStatus(updateStore, taskStore, updateId, job, ERROR, user, true);
+        changeJobUpdateStatus(storeProvider, updateId, job, ERROR, user, true);
         return;
       }
       updates.put(job, update);
       evaluateUpdater(
-          updateStore,
-          taskStore,
+          storeProvider,
           update,
           jobUpdate.getSummary(),
           ImmutableMap.<Integer, Optional<IScheduledTask>>of());
@@ -433,8 +422,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
       ImmutableSet.of(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED);
 
   private void evaluateUpdater(
-      JobUpdateStore.Mutable updateStore,
-      final TaskStore taskStore,
+      final MutableStoreProvider storeProvider,
       final UpdateFactory.Update update,
       IJobUpdateSummary summary,
       Map<Integer, Optional<IScheduledTask>> changedInstance) {
@@ -442,10 +430,10 @@ class JobUpdateControllerImpl implements JobUpdateController {
     JobUpdateStatus updaterStatus = summary.getState().getStatus();
     final IJobKey job = summary.getJobKey();
 
+    final JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore();
     if (!updateStore.getLockToken(summary.getUpdateId()).isPresent()) {
       recordAndChangeJobUpdateStatus(
-          updateStore,
-          taskStore,
+          storeProvider,
           summary.getUpdateId(),
           job,
           ERROR,
@@ -457,7 +445,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
         new InstanceStateProvider<Integer, Optional<IScheduledTask>>() {
           @Override
           public Optional<IScheduledTask> getState(Integer instanceId) {
-            return getActiveInstance(taskStore, job, instanceId);
+            return getActiveInstance(storeProvider.getTaskStore(), job, instanceId);
           }
         };
 
@@ -505,16 +493,13 @@ class JobUpdateControllerImpl implements JobUpdateController {
       }
 
       if (status == SUCCEEDED) {
-        changeUpdateStatus(
-            updateStore,
-            taskStore,
+        changeUpdateStatus(storeProvider,
             summary,
             update.getSuccessStatus(),
             Optional.<String>absent());
       } else {
         changeUpdateStatus(
-            updateStore,
-            taskStore,
+            storeProvider,
             summary,
             update.getFailureStatus(),
             Optional.<String>absent());
@@ -531,7 +516,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
             Amount<Long, Time> reevaluateDelay = handler.get().getReevaluationDelay(
                 instance,
                 updateStore.fetchJobUpdateInstructions(summary.getUpdateId()).get(),
-                taskStore,
+                storeProvider,
                 stateManager,
                 updaterStatus);
             executor.schedule(
@@ -575,7 +560,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
       public void run() {
         storage.write(new MutateWork.NoResult.Quiet() {
           @Override
-          protected void execute(Storage.MutableStoreProvider storeProvider) {
+          protected void execute(MutableStoreProvider storeProvider) {
             IJobUpdateSummary summary =
                 getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdateId(updateId));
             JobUpdateStatus status = summary.getState().getStatus();
@@ -583,8 +568,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
             if (JobUpdateStateMachine.isActive(status)) {
               UpdateFactory.Update update = updates.get(instance.getJobKey());
               evaluateUpdater(
-                  storeProvider.getJobUpdateStore(),
-                  storeProvider.getTaskStore(),
+                  storeProvider,
                   update,
                   summary,
                   ImmutableMap.of(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 4673e80..7ba9464 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.Attribute;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.OfferID;
@@ -66,6 +67,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
 
   private OfferQueue offerQueue;
   private StateManager stateManager;
+  private StorageTestUtil storageUtil;
 
   private TaskLauncher launcher;
 
@@ -73,7 +75,9 @@ public class UserTaskLauncherTest extends EasyMockTest {
   public void setUp() {
     offerQueue = createMock(OfferQueue.class);
     stateManager = createMock(StateManager.class);
-    launcher = new UserTaskLauncher(offerQueue, stateManager);
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    launcher = new UserTaskLauncher(storageUtil.storage, offerQueue, stateManager);
   }
 
   @Test
@@ -88,6 +92,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test
   public void testForwardsStatusUpdates() throws Exception {
     expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
         TASK_ID_A,
         Optional.<ScheduleStatus>absent(),
         RUNNING,
@@ -116,6 +121,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test(expected = StorageException.class)
   public void testFailedStatusUpdate() throws Exception {
     expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
         TASK_ID_A,
         Optional.<ScheduleStatus>absent(),
         RUNNING,
@@ -135,6 +141,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
   @Test
   public void testMemoryLimitTranslationHack() throws Exception {
     expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
         TASK_ID_A,
         Optional.<ScheduleStatus>absent(),
         FAILED,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index 59bfbcb..8b0367e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -587,6 +587,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
   private void expectPreempted(ScheduledTask preempted) throws Exception {
     expect(stateManager.changeState(
+        eq(storageUtil.mutableStoreProvider),
         eq(Tasks.id(preempted)),
         eq(Optional.<ScheduleStatus>absent()),
         eq(ScheduleStatus.PREEMPTING),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
index 9682c89..6eaf3ce 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
@@ -287,7 +287,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
     eventDispatch.setName(getClass().getName() + "-EventDispatch");
     eventDispatch.start();
 
-    stateManager.deleteTasks(ImmutableSet.of(taskId));
+    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
     expectLastCall().andAnswer(new IAnswer<Void>() {
       @Override
       public Void answer() {
@@ -306,7 +306,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
   }
 
   private void expectDeleteTasks(String... tasks) {
-    stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
+    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks));
   }
 
   private Capture<Runnable> expectDefaultDelayedPrune() {
@@ -342,7 +342,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
     storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
     if (pruned.length > 0) {
-      stateManager.deleteTasks(Tasks.ids(pruned));
+      stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e9f135dc/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 0e8a98c..9bc6a75 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -56,12 +56,14 @@ import org.apache.aurora.scheduler.storage.mem.MemStorage;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.TaskInfo;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -132,7 +134,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   private void expectAssigned(IScheduledTask task) {
-    expect(assigner.maybeAssign(OFFER, task, emptyJob))
+    expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, task, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
   }
 
@@ -192,7 +194,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     Capture<Function<HostOffer, Optional<TaskInfo>>> secondAssignment = expectLaunchAttempt(true);
 
-    expect(assigner.maybeAssign(OFFER, TASK_B, emptyJob))
+    expect(assigner.maybeAssign(storageUtil.mutableStoreProvider, OFFER, TASK_B, emptyJob))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();
@@ -302,7 +304,11 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     });
 
     Capture<Function<HostOffer, Optional<TaskInfo>>> assignment = expectLaunchAttempt(true);
-    expect(assigner.maybeAssign(OFFER, taskA, emptyJob))
+    expect(assigner.maybeAssign(
+        EasyMock.<MutableStoreProvider>anyObject(),
+        eq(OFFER),
+        eq(taskA),
+        eq(emptyJob)))
         .andReturn(Optional.of(TaskInfo.getDefaultInstance()));
 
     control.replay();


Mime
View raw message