aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [3/3] git commit: Refactor StateManagerImpl and TaskStateMachine for less code and better readability.
Date Mon, 13 Jan 2014 20:33:43 GMT
Refactor StateManagerImpl and TaskStateMachine for less code and better readability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/70adc19a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/70adc19a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/70adc19a

Branch: refs/heads/wfarner/state_machine_refactor
Commit: 70adc19ad99be08de93fd2b501b7f8bab83b0953
Parents: 801dcfb
Author: Bill Farner <bill@twitter.com>
Authored: Thu Jan 9 16:07:56 2014 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Mon Jan 13 12:26:17 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/state/SideEffect.java      |  94 +++
 .../scheduler/state/SideEffectStorage.java      | 169 ------
 .../scheduler/state/StateManagerImpl.java       | 500 +++++++--------
 .../scheduler/state/TaskStateMachine.java       | 546 +++++++----------
 .../aurora/scheduler/state/WorkCommand.java     |  33 -
 .../aurora/scheduler/storage/Storage.java       |   4 +
 .../aurora/scheduler/storage/TaskStore.java     |   2 +
 .../scheduler/state/StateManagerImplTest.java   |  95 ++-
 .../scheduler/state/TaskStateMachineTest.java   | 601 ++++++++++++++-----
 9 files changed, 1116 insertions(+), 928 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
new file mode 100644
index 0000000..5759691
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+class SideEffect {
+  private final Action action;
+  private final Optional<ScheduleStatus> nextState;
+
+  SideEffect(Action action, Optional<ScheduleStatus> nextState) {
+    this.action = action;
+    if (action == Action.STATE_CHANGE) {
+      Preconditions.checkArgument(
+          nextState.isPresent(),
+          "A next state must be provided for a state change action.");
+    }
+    this.nextState = nextState;
+  }
+
+  public Action getAction() {
+    return action;
+  }
+
+  public Optional<ScheduleStatus> getNextState() {
+    return nextState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof SideEffect)) {
+      return false;
+    }
+
+    SideEffect other = (SideEffect) o;
+    return Objects.equal(action, other.action)
+        && Objects.equal(nextState, other.nextState);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(action, nextState);
+  }
+
+  @Override
+  public String toString() {
+    if (nextState.isPresent()) {
+      return action.toString() + " " + nextState.get();
+    } else {
+      return action.toString();
+    }
+  }
+
+  enum Action {
+    // Send an instruction for the runner of this task to kill the task.
+    KILL,
+
+    // Create a new state machine with a copy of this task.
+    RESCHEDULE,
+
+    // Update the task's state (schedule status) in the persistent store to match the state machine.
+    SAVE_STATE,
+
+    // Delete this task from the persistent store.
+    DELETE,
+
+    // Increment the failure count for this task.
+    INCREMENT_FAILURES,
+
+    // Perform an additional state change on the task.
+    STATE_CHANGE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
deleted file mode 100644
index 2bdd459..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.Storage.Work;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around the persistent storage and mutable state.
- */
-class SideEffectStorage {
-
-  private final Queue<PubsubEvent> events = Lists.newLinkedList();
-  @VisibleForTesting
-  Queue<PubsubEvent> getEvents() {
-    return events;
-  }
-
-  private AtomicBoolean inOperation = new AtomicBoolean(false);
-
-  private final Storage storage;
-  private final OperationFinalizer operationFinalizer;
-  private final EventSink eventSink;
-
-  interface OperationFinalizer {
-    /**
-     * Performs any work necessary to complete the operation.
-     * This is executed in the context of a write operation, immediately after the work
-     * executes normally.
-     * NOTE: At present, this is executed for every nesting level of operations, rather than
-     * at the completion of the top-level operation.
-     * See comment in {@link #SideEffectStorage#executeSideEffectsAfter(SideEffectWork)}
-     * for more detail.
-     *
-     * @param work Work to finalize.
-     * @param storeProvider Mutable store reference.
-     */
-    void finalize(SideEffectWork<?, ?> work, MutableStoreProvider storeProvider);
-  }
-
-  SideEffectStorage(
-      Storage storage,
-      OperationFinalizer operationFinalizer,
-      EventSink eventSink) {
-
-    this.storage = checkNotNull(storage);
-    this.operationFinalizer = checkNotNull(operationFinalizer);
-    this.eventSink = checkNotNull(eventSink);
-  }
-
-  /**
-   * Perform a unit of work in a mutating operation.  This supports nesting/reentrancy.
-   *
-   * @param work Work to perform.
-   * @param <T> Work return type
-   * @param <E> Work exception type.
-   * @return The work return value.
-   * @throws E The work exception.
-   */
-  <T, E extends Exception> T write(SideEffectWork<T, E> work) throws E {
-    return storage.write(executeSideEffectsAfter(work));
-  }
-
-  <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
-    return storage.consistentRead(work);
-  }
-
-  /**
-   * Work that has side effects external to the storage system.
-   * Work may add side effect and pubsub events, which will be executed/sent upon normal
-   * completion of the operation.
-   *
-   * @param <T> Work return type.
-   * @param <E> Work exception type.
-   */
-  abstract class SideEffectWork<T, E extends Exception> implements MutateWork<T, E> {
-    protected final void addTaskEvent(PubsubEvent notice) {
-      Preconditions.checkState(inOperation.get());
-      events.add(Preconditions.checkNotNull(notice));
-    }
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions.
-   *
-   * @param <T>   Work return type.
-   */
-  abstract class QuietSideEffectWork<T> extends SideEffectWork<T, RuntimeException> {
-  }
-
-  /**
-   * Work with side effects that does not have a return value.
-   *
-   * @param <E> Work exception type.
-   */
-  abstract class NoResultSideEffectWork<E extends Exception> extends SideEffectWork<Void, E> {
-
-    @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
-      execute(storeProvider);
-      return null;
-    }
-
-    abstract void execute(MutableStoreProvider storeProvider) throws E;
-  }
-
-  /**
-   * Work with side effects which does not throw checked exceptions or have a return
-   * value.
-   */
-  abstract class NoResultQuietSideEffectWork extends NoResultSideEffectWork<RuntimeException> {
-  }
-
-  private <T, E extends Exception> MutateWork<T, E> executeSideEffectsAfter(
-      final SideEffectWork<T, E> work) {
-
-    return new MutateWork<T, E>() {
-      @Override public T apply(MutableStoreProvider storeProvider) throws E {
-        boolean topLevelOperation = inOperation.compareAndSet(false, true);
-
-        try {
-          T result = work.apply(storeProvider);
-
-          // TODO(William Farner): Maintaining this since it matches prior behavior, but this
-          // seems wrong.  Double-check whether this is necessary, or if only the top-level
-          // operation should be executing the finalizer.  Update doc on OperationFinalizer
-          // once this is assessed.
-          operationFinalizer.finalize(work, storeProvider);
-          if (topLevelOperation) {
-            while (!events.isEmpty()) {
-              eventSink.post(events.remove());
-            }
-          }
-          return result;
-        } finally {
-          if (topLevelOperation) {
-            inOperation.set(false);
-          }
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 6078eee..819d921 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -15,42 +15,48 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Comparator;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
 import com.twitter.common.util.Clock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskEvent;
 import org.apache.aurora.scheduler.Driver;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
+import org.apache.aurora.scheduler.state.SideEffect.Action;
+import org.apache.aurora.scheduler.state.TaskStateMachine.TransitionResult;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -62,11 +68,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.transform;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
-import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinalizer;
-
 
 /**
  * Manager of all persistence-related operations for the scheduler.  Acts as a controller for
@@ -78,38 +82,11 @@ import static org.apache.aurora.scheduler.state.SideEffectStorage.OperationFinal
 public class StateManagerImpl implements StateManager {
   private static final Logger LOG = Logger.getLogger(StateManagerImpl.class.getName());
 
-  private final SideEffectStorage storage;
-  @VisibleForTesting
-  SideEffectStorage getStorage() {
-    return storage;
-  }
-
+  private final Storage storage;
+  private final Clock clock;
+  private final Driver driver;
   private final TaskIdGenerator taskIdGenerator;
-
-  // Work queue to receive state machine side effect work.
-  // Items are sorted to place DELETE entries last.  This is to ensure that within an operation,
-  // a delete is always processed after a state transition.
-  private final Queue<WorkEntry> workQueue = new PriorityQueue<>(10,
-      new Comparator<WorkEntry>() {
-        @Override public int compare(WorkEntry a, WorkEntry b) {
-          if ((a.command == WorkCommand.DELETE) != (b.command == WorkCommand.DELETE)) {
-            return (a.command == WorkCommand.DELETE) ? 1 : -1;
-          } else {
-            return 0;
-          }
-        }
-      });
-
-  // Adapt the work queue into a sink.
-  private final TaskStateMachine.WorkSink workSink = new TaskStateMachine.WorkSink() {
-      @Override public void addWork(
-          WorkCommand work,
-          TaskStateMachine stateMachine,
-          Function<IScheduledTask, IScheduledTask> mutation) {
-
-        workQueue.add(new WorkEntry(work, stateMachine, mutation));
-      }
-    };
+  private final EventSink eventSink;
 
   private final Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask> taskCreator =
       new Function<Map.Entry<Integer, ITaskConfig>, IScheduledTask>() {
@@ -125,28 +102,6 @@ public class StateManagerImpl implements StateManager {
         }
       };
 
-  private final Driver driver;
-  private final Clock clock;
-
-  /**
-   * An item of work on the work queue.
-   */
-  private static class WorkEntry {
-    private final WorkCommand command;
-    private final TaskStateMachine stateMachine;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    WorkEntry(
-        WorkCommand command,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      this.command = command;
-      this.stateMachine = stateMachine;
-      this.mutation = mutation;
-    }
-  }
-
   @Inject
   StateManagerImpl(
       final Storage storage,
@@ -155,20 +110,11 @@ public class StateManagerImpl implements StateManager {
       TaskIdGenerator taskIdGenerator,
       EventSink eventSink) {
 
-    checkNotNull(storage);
+    this.storage = checkNotNull(storage);
     this.clock = checkNotNull(clock);
-
-    OperationFinalizer finalizer = new OperationFinalizer() {
-      @Override public void finalize(SideEffectWork<?, ?> work, MutableStoreProvider store) {
-        processWorkQueueInWriteOperation(work, store);
-      }
-    };
-
-    this.storage = new SideEffectStorage(storage, finalizer, eventSink);
     this.driver = checkNotNull(driver);
     this.taskIdGenerator = checkNotNull(taskIdGenerator);
-
-    Stats.exportSize("work_queue_depth", workQueue);
+    this.eventSink = checkNotNull(eventSink);
   }
 
   @Override
@@ -179,12 +125,16 @@ public class StateManagerImpl implements StateManager {
     final Set<IScheduledTask> scheduledTasks =
         ImmutableSet.copyOf(transform(tasks.entrySet(), taskCreator));
 
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(MutableStoreProvider storeProvider) {
         storeProvider.getUnsafeTaskStore().saveTasks(scheduledTasks);
 
         for (IScheduledTask task : scheduledTasks) {
-          createStateMachine(task).updateState(PENDING);
+          updateTaskAndExternalState(
+              Tasks.id(task),
+              Optional.of(task),
+              PENDING,
+              Optional.<String>absent());
         }
       }
     });
@@ -197,54 +147,55 @@ public class StateManagerImpl implements StateManager {
       final ScheduleStatus newState,
       final Optional<String> auditMessage) {
 
-    return changeState(taskId, casState, new Function<TaskStateMachine, Boolean>() {
-      @Override
-      public Boolean apply(TaskStateMachine stateMachine) {
-        return stateMachine.updateState(newState, auditMessage);
-      }
-    });
+    return updateTaskAndExternalState(casState, taskId, newState, auditMessage);
   }
 
   @Override
   public IAssignedTask assignTask(
-      String taskId,
-      String slaveHost,
-      SlaveID slaveId,
-      Set<Integer> assignedPorts) {
+      final String taskId,
+      final String slaveHost,
+      final SlaveID slaveId,
+      final Set<Integer> assignedPorts) {
 
     checkNotBlank(taskId);
     checkNotBlank(slaveHost);
+    checkNotNull(slaveId);
     checkNotNull(assignedPorts);
 
-    TaskAssignMutation mutation = assignHost(slaveHost, slaveId, assignedPorts);
-    changeState(taskId, Optional.<ScheduleStatus>absent(), mutation);
-
-    return mutation.getAssignedTask();
-  }
-
-  private boolean changeState(
-      final String taskId,
-      final Optional<ScheduleStatus> casState,
-      final Function<TaskStateMachine, Boolean> stateChange) {
-
-    return storage.write(storage.new QuietSideEffectWork<Boolean>() {
-      @Override public Boolean apply(MutableStoreProvider storeProvider) {
-        IScheduledTask task = Iterables.getOnlyElement(
-            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
-            null);
-        if (casState.isPresent() && (task != null) && (task.getStatus() != casState.get())) {
-          return false;
-        }
+    return storage.write(new MutateWork.Quiet<IAssignedTask>() {
+      @Override public IAssignedTask apply(MutableStoreProvider storeProvider) {
+        boolean success = updateTaskAndExternalState(
+            Optional.<ScheduleStatus>absent(),
+            taskId,
+            ASSIGNED,
+            Optional.<String>absent());
+
+        Preconditions.checkState(
+            success,
+            "Attempt to assign task " + taskId + " to " + slaveHost + " failed");
+        Query.Builder query = Query.taskScoped(taskId);
+        storeProvider.getUnsafeTaskStore().mutateTasks(query,
+            new Function<IScheduledTask, IScheduledTask>() {
+              @Override
+              public IScheduledTask apply(IScheduledTask task) {
+                ScheduledTask builder = task.newBuilder();
+                AssignedTask assigned = builder.getAssignedTask();
+                assigned.setAssignedPorts(
+                    getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
+                assigned.setSlaveHost(slaveHost)
+                    .setSlaveId(slaveId.getValue());
+                return IScheduledTask.build(builder);
+              }
+            });
 
-        return stateChange.apply(getStateMachine(taskId, task));
+        return Iterables.getOnlyElement(
+            Iterables.transform(
+                storeProvider.getTaskStore().fetchTasks(query),
+                Tasks.SCHEDULED_TO_ASSIGNED));
       }
     });
   }
 
-  private interface TaskAssignMutation extends Function<TaskStateMachine, Boolean> {
-    IAssignedTask getAssignedTask();
-  }
-
   private static Map<String, Integer> getNameMappedPorts(
       Set<String> portNames,
       Set<Integer> allocatedPorts) {
@@ -270,157 +221,218 @@ public class StateManagerImpl implements StateManager {
     return ports;
   }
 
-  private TaskAssignMutation assignHost(
-      final String slaveHost,
-      final SlaveID slaveId,
-      final Set<Integer> assignedPorts) {
+  @VisibleForTesting
+  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+      new Supplier<String>() {
+        @Override public String get() {
+          try {
+            return InetAddress.getLocalHost().getHostName();
+          } catch (UnknownHostException e) {
+            LOG.log(Level.SEVERE, "Failed to get self hostname.");
+            throw Throwables.propagate(e);
+          }
+        }
+      });
 
-    final TaskMutation mutation = new TaskMutation() {
-      @Override public IScheduledTask apply(IScheduledTask task) {
-        ScheduledTask builder = task.newBuilder();
-        AssignedTask assigned = builder.getAssignedTask();
-        assigned.setAssignedPorts(
-            getNameMappedPorts(assigned.getTask().getRequestedPorts(), assignedPorts));
-        assigned.setSlaveHost(slaveHost)
-            .setSlaveId(slaveId.getValue());
-        return IScheduledTask.build(builder);
-      }
-    };
+  private boolean updateTaskAndExternalState(
+      final Optional<ScheduleStatus> casState,
+      final String taskId,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
 
-    return new TaskAssignMutation() {
-      private AtomicReference<IAssignedTask> assignedTask = Atomics.newReference();
-      @Override public IAssignedTask getAssignedTask() {
-        return assignedTask.get();
-      }
+    return storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        Optional<IScheduledTask> task = Optional.fromNullable(Iterables.getOnlyElement(
+            storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+            null));
 
-      @Override public Boolean apply(final TaskStateMachine stateMachine) {
-        TaskMutation wrapper = new TaskMutation() {
-          @Override public IScheduledTask apply(IScheduledTask task) {
-            IScheduledTask mutated = mutation.apply(task);
-            Preconditions.checkState(
-                assignedTask.compareAndSet(null, mutated.getAssignedTask()),
-                "More than one result was found for an identity query.");
-            return mutated;
-          }
-        };
-        return stateMachine.updateState(ScheduleStatus.ASSIGNED, wrapper);
+        // CAS operation fails if the task does not exist, or the states don't match.
+        if (casState.isPresent()
+            && (!task.isPresent() || (casState.get() != task.get().getStatus()))) {
+
+          return false;
+        }
+
+        return updateTaskAndExternalState(taskId, task, targetState, transitionMessage);
       }
-    };
+    });
   }
 
-  private void processWorkQueueInWriteOperation(
-      SideEffectWork<?, ?> sideEffectWork,
-      MutableStoreProvider storeProvider) {
-
-    for (final WorkEntry work : Iterables.consumingIterable(workQueue)) {
-      final TaskStateMachine stateMachine = work.stateMachine;
-
-      if (work.command == WorkCommand.KILL) {
-        driver.killTask(stateMachine.getTaskId());
-      } else {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        String taskId = stateMachine.getTaskId();
-        Query.Builder idQuery = Query.taskScoped(taskId);
-
-        switch (work.command) {
-          case RESCHEDULE:
-            ScheduledTask builder =
-                Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)).newBuilder();
-            builder.getAssignedTask().unsetSlaveId();
-            builder.getAssignedTask().unsetSlaveHost();
-            builder.getAssignedTask().unsetAssignedPorts();
-            builder.unsetTaskEvents();
-            builder.setAncestorId(taskId);
-            String newTaskId = taskIdGenerator.generate(
-                ITaskConfig.build(builder.getAssignedTask().getTask()),
-                builder.getAssignedTask().getInstanceId());
-            builder.getAssignedTask().setTaskId(newTaskId);
-
-            LOG.info("Task being rescheduled: " + taskId);
-
-            IScheduledTask task = IScheduledTask.build(builder);
-            taskStore.saveTasks(ImmutableSet.of(task));
-
-            createStateMachine(task).updateState(PENDING, Optional.of("Rescheduled"));
-            ITaskConfig taskInfo = task.getAssignedTask().getTask();
-            sideEffectWork.addTaskEvent(
-                new PubsubEvent.TaskRescheduled(
-                    taskInfo.getOwner().getRole(),
-                    taskInfo.getJobName(),
-                    task.getAssignedTask().getInstanceId()));
-            break;
-
-          case UPDATE_STATE:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return work.mutation.apply(
-                    IScheduledTask.build(task.newBuilder().setStatus(stateMachine.getState())));
-              }
-            });
-            sideEffectWork.addTaskEvent(
-                PubsubEvent.TaskStateChange.transition(
-                    Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
-                    stateMachine.getPreviousState()));
-            break;
-
-          case DELETE:
-            deleteTasks(ImmutableSet.of(taskId));
-            break;
-
-          case INCREMENT_FAILURES:
-            taskStore.mutateTasks(idQuery, new TaskMutation() {
-              @Override public IScheduledTask apply(IScheduledTask task) {
-                return IScheduledTask.build(
-                    task.newBuilder().setFailureCount(task.getFailureCount() + 1));
-              }
-            });
-            break;
+  private static final Function<SideEffect, SideEffect.Action> GET_ACTION =
+      new Function<SideEffect, Action>() {
+        @Override public Action apply(SideEffect sideEffect) {
+          return sideEffect.getAction();
+        }
+      };
+
+  private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
+      Action.INCREMENT_FAILURES,
+      Action.SAVE_STATE,
+      Action.STATE_CHANGE,
+      Action.RESCHEDULE,
+      Action.KILL,
+      Action.DELETE);
+  static {
+    // Sanity check to ensure no actions are missing.
+    Preconditions.checkState(
+        ImmutableSet.copyOf(ACTIONS_IN_ORDER).equals(ImmutableSet.copyOf(Action.values())),
+        "Not all actions are included in ordering.");
+  }
+
+  // Actions are deliberately ordered to prevent things like deleting a task before rescheduling it
+  // (thus losing the object to copy), or rescheduling a task before incrementing the failure count
+  // (thus not carrying forward the failure increment).
+  private static final Ordering<SideEffect> ACTION_ORDER =
+      Ordering.explicit(ACTIONS_IN_ORDER).onResultOf(GET_ACTION);
 
-          default:
-            LOG.severe("Unrecognized work command type " + work.command);
+  private boolean updateTaskAndExternalState(
+      final String taskId,
+      // Note: This argument is deliberately non-final, and should not be made final.
+      // This is because using the captured value within the storage operation below is
+      // highly-risky, since it doesn't necessarily represent the value in storage.
+      // As a result, it would be easy to accidentally clobber mutations.
+      Optional<IScheduledTask> task,
+      final ScheduleStatus targetState,
+      final Optional<String> transitionMessage) {
+
+    if (task.isPresent()) {
+      Preconditions.checkArgument(taskId.equals(task.get().getAssignedTask().getTaskId()));
+    }
+
+    final List<PubsubEvent> events = Lists.newArrayList();
+
+    final TaskStateMachine stateMachine = task.isPresent()
+        ? new TaskStateMachine(task.get())
+        : new TaskStateMachine(taskId);
+
+    boolean success = storage.write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        TransitionResult result = stateMachine.updateState(targetState);
+        Query.Builder query = Query.taskScoped(taskId);
+
+        for (SideEffect sideEffect : ACTION_ORDER.sortedCopy(result.getSideEffects())) {
+          Optional<IScheduledTask> upToDateTask = Optional.fromNullable(
+              Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query), null));
+
+          switch (sideEffect.getAction()) {
+            case KILL:
+              driver.killTask(taskId);
+              break;
+
+            case RESCHEDULE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+              LOG.info("Task being rescheduled: " + taskId);
+
+              ScheduledTask builder = upToDateTask.get().newBuilder();
+              builder.setStatus(INIT);
+              builder.getAssignedTask().unsetSlaveId();
+              builder.getAssignedTask().unsetSlaveHost();
+              builder.getAssignedTask().unsetAssignedPorts();
+              builder.unsetTaskEvents();
+              builder.setAncestorId(taskId);
+              String newTaskId = taskIdGenerator.generate(
+                  ITaskConfig.build(builder.getAssignedTask().getTask()),
+                  builder.getAssignedTask().getInstanceId());
+              builder.getAssignedTask().setTaskId(newTaskId);
+
+              IScheduledTask newTask = IScheduledTask.build(builder);
+              storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(newTask));
+              updateTaskAndExternalState(
+                  newTaskId,
+                  Optional.of(newTask),
+                  PENDING, Optional.of("Rescheduled"));
+
+              ITaskConfig taskInfo = newTask.getAssignedTask().getTask();
+              events.add(
+                  new PubsubEvent.TaskRescheduled(
+                      taskInfo.getOwner().getRole(),
+                      taskInfo.getJobName(),
+                      newTask.getAssignedTask().getInstanceId()));
+              break;
+
+            case SAVE_STATE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  ScheduledTask mutableTask = task.newBuilder();
+                  mutableTask.setStatus(stateMachine.getState());
+                  mutableTask.addToTaskEvents(new TaskEvent()
+                      .setTimestamp(clock.nowMillis())
+                      .setStatus(targetState)
+                      .setMessage(transitionMessage.orNull())
+                      .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+                  return IScheduledTask.build(mutableTask);
+                }
+              });
+              events.add(
+                  PubsubEvent.TaskStateChange.transition(
+                      Iterables.getOnlyElement(storeProvider.getTaskStore().fetchTasks(query)),
+                      stateMachine.getPreviousState()));
+              break;
+
+            case DELETE:
+              Preconditions.checkState(
+                  upToDateTask.isPresent(),
+                  "Operation expected task " + taskId + " to be present.");
+
+              events.add(deleteTasks(storeProvider, ImmutableSet.of(taskId)));
+              break;
+
+            case INCREMENT_FAILURES:
+              storeProvider.getUnsafeTaskStore().mutateTasks(query, new TaskMutation() {
+                @Override public IScheduledTask apply(IScheduledTask task) {
+                  return IScheduledTask.build(
+                      task.newBuilder().setFailureCount(task.getFailureCount() + 1));
+                }
+              });
+              break;
+
+            case STATE_CHANGE:
+              updateTaskAndExternalState(
+                  Optional.<ScheduleStatus>absent(),
+                  taskId,
+                  sideEffect.getNextState().get(),
+                  Optional.<String>absent());
+              break;
+
+            default:
+              throw new IllegalStateException("Unrecognized side-effect " + sideEffect.getAction());
+          }
         }
+
+        return result.isSuccess();
       }
+    });
+
+    // Note: Delaying events until after the write operation is somewhat futile, since the state
+    // may actually not be written to durable store (e.g. if this is a nested transaction).
+    // Ideally, Storage would add a facility to attach side-effects that are performed after the
+    // outer-most transaction completes (meaning state has been durably persisted).
+    for (PubsubEvent event : events) {
+      eventSink.post(event);
     }
+
+    return success;
   }
 
   @Override
   public void deleteTasks(final Set<String> taskIds) {
-    storage.write(storage.new NoResultQuietSideEffectWork() {
+    storage.write(new MutateWork.NoResult.Quiet() {
       @Override protected void execute(final MutableStoreProvider storeProvider) {
-        TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
-        Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
-        addTaskEvent(new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)));
-        taskStore.deleteTasks(taskIds);
+        eventSink.post(deleteTasks(storeProvider, taskIds));
       }
     });
   }
 
-  private TaskStateMachine getStateMachine(String taskId, @Nullable IScheduledTask task) {
-    if (task != null) {
-      return createStateMachine(task, task.getStatus());
-    }
-
-    // The task is unknown, not present in storage.
-    TaskStateMachine stateMachine = new TaskStateMachine(
-        taskId,
-        null,
-        workSink,
-        clock,
-        INIT);
-    stateMachine.updateState(UNKNOWN);
-    return stateMachine;
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task) {
-    return createStateMachine(task, INIT);
-  }
-
-  private TaskStateMachine createStateMachine(IScheduledTask task, ScheduleStatus initialState) {
-    return new TaskStateMachine(
-        Tasks.id(task),
-        task,
-        workSink,
-        clock,
-        initialState);
+  private static PubsubEvent deleteTasks(MutableStoreProvider storeProvider, Set<String> taskIds) {
+    TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
+    Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds));
+    taskStore.deleteTasks(taskIds);
+    return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index d0f88e5..cd0899c 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,39 +15,54 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.twitter.common.base.Closure;
 import com.twitter.common.base.Closures;
 import com.twitter.common.base.Command;
 import com.twitter.common.base.MorePreconditions;
 import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
 import com.twitter.common.util.StateMachine;
 import com.twitter.common.util.StateMachine.Rule;
 import com.twitter.common.util.StateMachine.Transition;
 
 import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
+import static org.apache.aurora.scheduler.state.SideEffect.Action;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.DELETE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAILURES;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static org.apache.aurora.scheduler.state.SideEffect.Action.STATE_CHANGE;
+
 /**
  * State machine for a task.
  * <p>
@@ -55,8 +70,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * to different state transitions.  These responses are externally communicated by populating a
  * provided work queue.
  * <p>
- * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
- *     abstractly from the consumption side.
+ * TODO(wfarner): Augment this class to force the one-time-use nature.  This is probably best done
+ * by hiding the constructor and exposing only a static function to transition a task and get the
+ * resulting actions.
  */
 class TaskStateMachine {
   private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
@@ -64,183 +80,94 @@ class TaskStateMachine {
   private static final AtomicLong ILLEGAL_TRANSITIONS =
       Stats.exportLong("scheduler_illegal_task_state_transitions");
 
-  // Re-declarations of statuses as wrapped state objects.
-  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
-  private static final State FAILED = State.create(ScheduleStatus.FAILED);
-  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
-  private static final State INIT = State.create(ScheduleStatus.INIT);
-  private static final State KILLED = State.create(ScheduleStatus.KILLED);
-  private static final State KILLING = State.create(ScheduleStatus.KILLING);
-  private static final State LOST = State.create(ScheduleStatus.LOST);
-  private static final State PENDING = State.create(ScheduleStatus.PENDING);
-  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
-  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
-  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
-  private static final State STARTING = State.create(ScheduleStatus.STARTING);
-  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
-
-  @VisibleForTesting
-  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
-        }
-      });
-
-  private final String taskId;
-  private final WorkSink workSink;
-  private final StateMachine<State> stateMachine;
+  private final StateMachine<ScheduleStatus> stateMachine;
   private ScheduleStatus previousState = null;
-  private final Clock clock;
-
-  /**
-   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
-   * States must be equal for them to be considered equal.
-   */
-  private static class State {
-    private final ScheduleStatus state;
-    private final Function<IScheduledTask, IScheduledTask> mutation;
-
-    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
-      this.state = state;
-      this.mutation = mutation;
-    }
-
-    static State create(ScheduleStatus status) {
-      return create(status, Functions.<IScheduledTask>identity());
-    }
-
-    static State create(
-        ScheduleStatus status,
-        Function<IScheduledTask, IScheduledTask> mutation) {
-
-      return new State(status, mutation);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof State)) {
-        return false;
-      }
-
-      if (o == this) {
-        return true;
-      }
-
-      State other = (State) o;
-      return state == other.state;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder()
-          .append(state)
-          .toHashCode();
-    }
 
-    @Override
-    public String toString() {
-      return state.toString();
-    }
-
-    private ScheduleStatus getState() {
-      return state;
-    }
-
-    private Function<IScheduledTask, IScheduledTask> getMutation() {
-      return mutation;
-    }
-  }
+  private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
   /**
-   * A write-only work acceptor.
+   * Creates a new task state machine representing a non-existent task.  This allows for consistent
+   * state-reconciliation actions when the external system disagrees with the scheduler.
+   *
+   * @param name Name of the state machine, for logging.
    */
-  public interface WorkSink {
-    /**
-     * Appends external work that must be performed for a state machine transition to be fully
-     * complete.
-     *
-     * @param work Description of the work to be performed.
-     * @param stateMachine The state machine that the work is associated with.
-     * @param mutation Mutate operation to perform along with the state transition.
-     */
-    void addWork(
-        WorkCommand work,
-        TaskStateMachine stateMachine,
-        Function<IScheduledTask, IScheduledTask> mutation);
+  public TaskStateMachine(String name) {
+    this(name, Optional.<IScheduledTask>absent());
   }
 
   /**
-   * Creates a new task state machine.
-   *
-   * @param taskId ID of the task managed by this state machine.
+   * Creates a new task state machine representing an existent task.  The state machine will be
+   * named with the tasks ID.
+   *.
    * @param task Read-only task that this state machine manages.
-   * @param workSink Work sink to receive transition response actions
-   * @param clock Clock to use for reading the current time.
-   * @param initialState The state to begin the state machine at.  All legal transitions will be
-   *     added, but this allows the state machine to 'skip' states, for instance when a task is
-   *     loaded from a persistent store.
    */
-  public TaskStateMachine(
-      final String taskId,
-      final IScheduledTask task,
-      final WorkSink workSink,
-      final Clock clock,
-      final ScheduleStatus initialState) {
-
-    this.taskId = MorePreconditions.checkNotBlank(taskId);
-    this.workSink = checkNotNull(workSink);
-    this.clock = checkNotNull(clock);
-    checkNotNull(initialState);
-
-    @SuppressWarnings("unchecked")
-    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
-        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
-        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
-            addWorkClosure(WorkCommand.KILL)),
-        /* Remove a terminated task that is remotely removed. */
-        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
-
-    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
-      @SuppressWarnings("fallthrough")
-      @Override public void execute(Transition<State> transition) {
-        switch (transition.getTo().getState()) {
-          case ASSIGNED:
-          case STARTING:
-          case RUNNING:
-            addWork(WorkCommand.KILL);
-            break;
-
-          case LOST:
-            addWork(WorkCommand.KILL);
-            // fall through
-
-          case FINISHED:
-          case FAILED:
-          case KILLED:
-            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
-            break;
-
-          case UNKNOWN:
-            updateState(ScheduleStatus.LOST);
-            break;
-
-          default:
-            // No-op.
-        }
-      }
-    };
+  public TaskStateMachine(IScheduledTask task) {
+    this(Tasks.id(task), Optional.of(task));
+  }
+
+  private TaskStateMachine(final String name, final Optional<IScheduledTask> task) {
+    MorePreconditions.checkNotBlank(name);
+    checkNotNull(task);
+
+    final ScheduleStatus initialState = task.transform(Tasks.GET_STATUS).or(UNKNOWN);
+    Preconditions.checkState((initialState == UNKNOWN) == !task.isPresent());
+
+    Closure<Transition<ScheduleStatus>> manageTerminatedTasks = Closures.combine(
+        ImmutableList.<Closure<Transition<ScheduleStatus>>>builder()
+            // Kill a task that we believe to be terminated when an attempt is made to revive.
+            .add(Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING), addWorkClosure(KILL)))
+            // Remove a terminated task that is remotely removed.
+            .add(Closures.filter(Transition.to(UNKNOWN), addWorkClosure(DELETE)))
+            .build());
+
+    final Closure<Transition<ScheduleStatus>> manageRestartingTask =
+        new Closure<Transition<ScheduleStatus>>() {
+          @Override public void execute(Transition<ScheduleStatus> transition) {
+            switch (transition.getTo()) {
+              case ASSIGNED:
+                addFollowup(KILL);
+                break;
+
+              case STARTING:
+                addFollowup(KILL);
+                break;
+
+              case RUNNING:
+                addFollowup(KILL);
+                break;
+
+              case LOST:
+                addFollowup(KILL);
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FINISHED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case FAILED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case KILLED:
+                addFollowup(RESCHEDULE);
+                break;
+
+              case UNKNOWN:
+                addFollowupTransition(LOST);
+                break;
+
+              default:
+                // No-op.
+            }
+          }
+        };
 
     // To be called on a task transitioning into the FINISHED state.
     final Command rescheduleIfService = new Command() {
       @Override public void execute() {
-        if (task.getAssignedTask().getTask().isIsService()) {
-          addWork(WorkCommand.RESCHEDULE);
+        if (task.get().getAssignedTask().getTask().isIsService()) {
+          addFollowup(RESCHEDULE);
         }
       }
     };
@@ -248,24 +175,26 @@ class TaskStateMachine {
     // To be called on a task transitioning into the FAILED state.
     final Command incrementFailuresMaybeReschedule = new Command() {
       @Override public void execute() {
-        addWork(WorkCommand.INCREMENT_FAILURES);
+        addFollowup(INCREMENT_FAILURES);
 
         // Max failures is ignored for service task.
-        boolean isService = task.getAssignedTask().getTask().isIsService();
+        boolean isService = task.get().getAssignedTask().getTask().isIsService();
 
         // Max failures is ignored when set to -1.
-        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
-        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
-          addWork(WorkCommand.RESCHEDULE);
+        int maxFailures = task.get().getAssignedTask().getTask().getMaxTaskFailures();
+        boolean belowMaxFailures =
+            (maxFailures == -1) || (task.get().getFailureCount() < (maxFailures - 1));
+        if (isService || belowMaxFailures) {
+          addFollowup(RESCHEDULE);
         } else {
-          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+          LOG.info("Task " + name + " reached failure limit, not rescheduling");
         }
       }
     };
 
-    stateMachine = StateMachine.<State>builder(taskId)
+    stateMachine = StateMachine.<ScheduleStatus>builder(name)
         .logTransitions()
-        .initialState(State.create(initialState))
+        .initialState(initialState)
         .addState(
             Rule.from(INIT)
                 .to(PENDING, UNKNOWN))
@@ -273,11 +202,11 @@ class TaskStateMachine {
             Rule.from(PENDING)
                 .to(ASSIGNED, KILLING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case KILLING:
-                            addWork(WorkCommand.DELETE);
+                            addFollowup(DELETE);
                             break;
 
                           default:
@@ -291,16 +220,15 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
                     KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -308,25 +236,24 @@ class TaskStateMachine {
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
-                            // fall through
-                          case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(RESCHEDULE);
+                            addFollowup(KILL);
                             break;
 
-                          case UNKNOWN:
+                          case KILLING:
+                            addFollowup(KILL);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -335,20 +262,19 @@ class TaskStateMachine {
             Rule.from(STARTING)
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -356,25 +282,25 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
                             // The slave previously acknowledged that it had the task, and now
                             // stopped reporting it.
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
-                           default:
-                             // No-op.
+                          default:
+                            // No-op.
                         }
                       }
                     }
@@ -383,20 +309,19 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<State>>() {
-                      @SuppressWarnings("fallthrough")
-                      @Override public void execute(Transition<State> transition) {
-                        switch (transition.getTo().getState()) {
+                    new Closure<Transition<ScheduleStatus>>() {
+                      @Override public void execute(Transition<ScheduleStatus> transition) {
+                        switch (transition.getTo()) {
                           case FINISHED:
                             rescheduleIfService.execute();
                             break;
 
                           case PREEMPTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case RESTARTING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case FAILED:
@@ -404,19 +329,19 @@ class TaskStateMachine {
                             break;
 
                           case KILLED:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case KILLING:
-                            addWork(WorkCommand.KILL);
+                            addFollowup(KILL);
                             break;
 
                           case LOST:
-                            addWork(WorkCommand.RESCHEDULE);
+                            addFollowup(RESCHEDULE);
                             break;
 
                           case UNKNOWN:
-                            updateState(ScheduleStatus.LOST);
+                            addFollowupTransition(LOST);
                             break;
 
                            default:
@@ -460,23 +385,25 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<State>>() {
-              @Override public void execute(final Transition<State> transition) {
-                ScheduleStatus from = transition.getFrom().getState();
-                ScheduleStatus to = transition.getTo().getState();
-
-                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
-                    // Prevent an update when killing a pending task, since the task is deleted
-                    // prior to the update.
-                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
-                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
-                } else if (!transition.isAllowed()) {
-                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
-                  ILLEGAL_TRANSITIONS.incrementAndGet();
-                }
-
+            new Closure<Transition<ScheduleStatus>>() {
+              @Override public void execute(final Transition<ScheduleStatus> transition) {
                 if (transition.isValidStateChange()) {
+                  ScheduleStatus from = transition.getFrom();
+                  ScheduleStatus to = transition.getTo();
+
+                  // TODO(wfarner): Clean up this hack.  This is here to suppress unnecessary work
+                  // (save followed by delete), but it shows a wart with this catch-all behavior.
+                  // Strongly consider pushing the SAVE_STATE behavior to each transition handler.
+                  boolean pendingDeleteHack = !((from == PENDING) && (to == KILLING));
+
+                  // Don't bother saving state of a task that is being removed.
+                  if ((to != UNKNOWN) && pendingDeleteHack) {
+                    addFollowup(SAVE_STATE);
+                  }
                   previousState = from;
+                } else {
+                  LOG.severe("Illegal state transition attempted: " + transition);
+                  ILLEGAL_TRANSITIONS.incrementAndGet();
                 }
               }
             }
@@ -490,56 +417,67 @@ class TaskStateMachine {
         .build();
   }
 
-  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
-    return new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> item) {
-        addWork(work);
-      }
-    };
+  private void addFollowup(Action action) {
+    addFollowup(new SideEffect(action, Optional.<ScheduleStatus>absent()));
   }
 
-  private void addWork(WorkCommand work) {
-    addWork(work, Functions.<IScheduledTask>identity());
+  private void addFollowupTransition(ScheduleStatus status) {
+    addFollowup(new SideEffect(STATE_CHANGE, Optional.of(status)));
   }
 
-  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
-    LOG.info("Adding work command " + work + " for " + this);
-    workSink.addWork(work, TaskStateMachine.this, mutation);
+  private void addFollowup(SideEffect action) {
+    LOG.info("Adding work command " + action + " for " + this);
+    sideEffects.add(action);
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status) {
-    return updateState(status, Functions.<IScheduledTask>identity());
+  private Closure<Transition<ScheduleStatus>> addWorkClosure(final Action action) {
+    return new Closure<Transition<ScheduleStatus>>() {
+      @Override public void execute(Transition<ScheduleStatus> item) {
+        addFollowup(action);
+      }
+    };
   }
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
-   *
-   * @param status Status to apply to the task.
-   * @param auditMessage The (optional) audit message to associate with the transition.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
-    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
-  }
+  public static class TransitionResult {
+    private final boolean success;
+    private final Set<SideEffect> sideEffects;
 
-  /**
-   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
-   *
-   * @param status Status to apply to the task.
-   * @param mutation Mutate operation to perform while updating the task.
-   * @return {@code true} if the state change was allowed, {@code false} otherwise.
-   */
-  public synchronized boolean updateState(
-      ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation) {
+    public TransitionResult(boolean success, Set<SideEffect> sideEffects) {
+      this.success = success;
+      this.sideEffects = Preconditions.checkNotNull(sideEffects);
+    }
+
+    public boolean isSuccess() {
+      return success;
+    }
+
+    public Set<SideEffect> getSideEffects() {
+      return sideEffects;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof TransitionResult)) {
+        return false;
+      }
+
+      TransitionResult other = (TransitionResult) o;
+      return (success == other.success)
+          && Objects.equal(sideEffects, other.sideEffects);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(success, sideEffects);
+    }
 
-    return updateState(status, mutation, Optional.<String>absent());
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+          .add("success", success)
+          .add("sideEffects", sideEffects)
+          .toString();
+    }
   }
 
   /**
@@ -548,59 +486,35 @@ class TaskStateMachine {
    * will be appended to the work queue.
    *
    * @param status Status to apply to the task.
-   * @param auditMessage The audit message to associate with the transition.
-   * @param mutation Mutate operation to perform while updating the task.
    * @return {@code true} if the state change was allowed, {@code false} otherwise.
    */
-  public synchronized boolean updateState(
-      final ScheduleStatus status,
-      Function<IScheduledTask, IScheduledTask> mutation,
-      final Optional<String> auditMessage) {
-
+  public synchronized TransitionResult updateState(final ScheduleStatus status) {
     checkNotNull(status);
-    checkNotNull(mutation);
-    checkNotNull(auditMessage);
+    Preconditions.checkState(sideEffects.isEmpty());
 
     /**
      * Don't bother applying noop state changes.  If we end up modifying task state without a
      * state transition (e.g. storing resource consumption of a running task), we need to find
      * a different way to suppress noop transitions.
      */
-    if (stateMachine.getState().getState() != status) {
-      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
-          new Function<IScheduledTask, IScheduledTask>() {
-            @Override public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask builder = task.newBuilder();
-              builder.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(status)
-                  .setMessage(auditMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(builder);
-            }
-          });
-      return stateMachine.transition(State.create(status, operation));
+    if (stateMachine.getState() == status) {
+      return new TransitionResult(false, ImmutableSet.<SideEffect>of());
     }
 
-    return false;
+    boolean success = stateMachine.transition(status);
+    Set<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects);
+    sideEffects.clear();
+    return new TransitionResult(success, transitionEffects);
   }
 
   /**
    * Fetch the current state from the state machine.
+   * TODO(wfarner): Consider removing, the caller should know this.
    *
    * @return The current state.
    */
   public synchronized ScheduleStatus getState() {
-    return stateMachine.getState().getState();
-  }
-
-  /**
-   * Gets the ID for the task that this state machine manages.
-   *
-   * @return The state machine's task ID.
-   */
-  public String getTaskId() {
-    return taskId;
+    return stateMachine.getState();
   }
 
   /**
@@ -616,6 +530,6 @@ class TaskStateMachine {
 
   @Override
   public String toString() {
-    return getTaskId();
+    return stateMachine.getName();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
deleted file mode 100644
index aff74d5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-/**
- * Descriptions of the different types of external work commands that task state machines may
- * trigger.
- */
-enum WorkCommand {
-  // Send an instruction for the runner of this task to kill the task.
-  KILL,
-  // Create a new state machine with a copy of this task.
-  RESCHEDULE,
-  // Update the task's state (schedule status) in the persistent store to match the state machine.
-  UPDATE_STATE,
-  // Delete this task from the persistent store.
-  DELETE,
-  // Increment the failure count for this task.
-  INCREMENT_FAILURES
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 79f5605..26468ce 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -200,6 +200,10 @@ public interface Storage {
 
   /**
    * Executes the unit of mutating {@code work}.
+   * TODO(wfarner): Add a mechanism by which mutating work can add side-effect operations to be
+   * performed after completion of the outer-most transaction.  As it stands, it's somewhat
+   * futile to try to achieve this within a transaction, since the local code does not know
+   * if the current transaction is nested.
    *
    * @param work The unit of work to execute.
    * @param <T> The type of result this unit of work produces.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 7fe297a..3d0ff2d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -71,6 +71,8 @@ public interface TaskStore {
     /**
      * Offers temporary mutable access to tasks.  If a task ID is not found, it will be silently
      * skipped, and no corresponding task will be returned.
+     * TODO(wfarner): Consider a non-batch variant of this, since that's a more common use case,
+     * and it prevents the caller from worrying about a bad query having broad impact.
      *
      * @param query Query to match tasks against.
      * @param mutator The mutate operation.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/70adc19a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index b93e47f..af20e82 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -40,6 +41,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskRescheduled;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -50,14 +52,14 @@ import org.apache.mesos.Protos.SlaveID;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.easymock.IArgumentMatcher;
-import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.UNKNOWN;
@@ -65,6 +67,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StateManagerImplTest extends EasyMockTest {
@@ -90,11 +93,6 @@ public class StateManagerImplTest extends EasyMockTest {
     stateManager = new StateManagerImpl(storage, clock, driver, taskIdGenerator, eventSink);
   }
 
-  @After
-  public void validateCompletion() {
-    assertTrue(stateManager.getStorage().getEvents().isEmpty());
-  }
-
   private static class StateChangeMatcher implements IArgumentMatcher {
     private final String taskId;
     private final ScheduleStatus from;
@@ -171,7 +169,7 @@ public class StateManagerImplTest extends EasyMockTest {
         .setStatus(PENDING)
         .setTaskEvents(ImmutableList.of(new TaskEvent()
             .setTimestamp(clock.nowMillis())
-            .setScheduler(TaskStateMachine.LOCAL_HOST_SUPPLIER.get())
+            .setScheduler(StateManagerImpl.LOCAL_HOST_SUPPLIER.get())
             .setStatus(PENDING)))
         .setAssignedTask(new AssignedTask()
             .setInstanceId(3)
@@ -265,6 +263,87 @@ public class StateManagerImplTest extends EasyMockTest {
     changeState(unknownTask, RUNNING);
   }
 
+  @Test
+  public void testIncrementFailureCount() {
+    ITaskConfig task = ITaskConfig.build(makeTask(JIM, MY_JOB).newBuilder().setIsService(true));
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, FAILED);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    expectStateTransitions(taskId2, INIT, PENDING);
+    eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, FAILED);
+    IScheduledTask rescheduledTask = Iterables.getOnlyElement(
+        Storage.Util.consistentFetchTasks(storage, Query.taskScoped(taskId2)));
+    assertEquals(1, rescheduledTask.getFailureCount());
+  }
+
+  @Test
+  public void testDoubleTransition() {
+    // Tests that a transition inducing another transition (STATE_CHANGE action) is performed.
+
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, LOST);
+
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId2);
+    expectStateTransitions(taskId2, INIT, PENDING);
+    eventSink.post(new TaskRescheduled(task.getOwner().getRole(), task.getJobName(), 0));
+
+    control.replay();
+
+    insertTask(task, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, UNKNOWN);
+  }
+
+  @Test
+  public void testCasTaskPresent() {
+    ITaskConfig task = makeTask(JIM, MY_JOB);
+    String taskId = "a";
+    expect(taskIdGenerator.generate(task, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, FAILED);
+
+    control.replay();
+
+    insertTask(task, 0);
+    assignTask(taskId, HOST_A);
+    assertFalse(stateManager.changeState(
+        taskId,
+        Optional.of(PENDING),
+        RUNNING,
+        Optional.<String>absent()));
+    assertTrue(stateManager.changeState(
+        taskId,
+        Optional.of(ASSIGNED),
+        FAILED,
+        Optional.<String>absent()));
+  }
+
+  @Test
+  public void testCasTaskNotFound() {
+    control.replay();
+
+    assertFalse(stateManager.changeState(
+        "a",
+        Optional.of(PENDING),
+        ASSIGNED,
+        Optional.<String>absent()));
+  }
+
   private void expectStateTransitions(
       String taskId,
       ScheduleStatus initial,


Mime
View raw message