aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [31/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
new file mode 100644
index 0000000..f32fd14
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -0,0 +1,622 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskEvent;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.StateMachine;
+import com.twitter.common.util.StateMachine.Rule;
+import com.twitter.common.util.StateMachine.Transition;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State machine for a task.
+ * <p>
+ * This enforces the lifecycle of a task, and triggers the actions that should be taken in response
+ * to different state transitions.  These responses are externally communicated by populating a
+ * provided work queue.
+ * <p>
+ * TODO(William Farner): Introduce an interface to allow state machines to be dealt with
+ *     abstractly from the consumption side.
+ */
+class TaskStateMachine {
+  private static final Logger LOG = Logger.getLogger(TaskStateMachine.class.getName());
+
+  private static final AtomicLong ILLEGAL_TRANSITIONS =
+      Stats.exportLong("scheduler_illegal_task_state_transitions");
+
+  // Re-declarations of statuses as wrapped state objects.
+  private static final State ASSIGNED = State.create(ScheduleStatus.ASSIGNED);
+  private static final State FAILED = State.create(ScheduleStatus.FAILED);
+  private static final State FINISHED = State.create(ScheduleStatus.FINISHED);
+  private static final State INIT = State.create(ScheduleStatus.INIT);
+  private static final State KILLED = State.create(ScheduleStatus.KILLED);
+  private static final State KILLING = State.create(ScheduleStatus.KILLING);
+  private static final State LOST = State.create(ScheduleStatus.LOST);
+  private static final State PENDING = State.create(ScheduleStatus.PENDING);
+  private static final State PREEMPTING = State.create(ScheduleStatus.PREEMPTING);
+  private static final State RESTARTING = State.create(ScheduleStatus.RESTARTING);
+  private static final State RUNNING = State.create(ScheduleStatus.RUNNING);
+  private static final State STARTING = State.create(ScheduleStatus.STARTING);
+  private static final State UNKNOWN = State.create(ScheduleStatus.UNKNOWN);
+
+  @VisibleForTesting
+  static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
+      new Supplier<String>() {
+        @Override public String get() {
+          try {
+            return InetAddress.getLocalHost().getHostName();
+          } catch (UnknownHostException e) {
+            LOG.log(Level.SEVERE, "Failed to get self hostname.");
+            throw Throwables.propagate(e);
+          }
+        }
+      });
+
+  private final String taskId;
+  private final WorkSink workSink;
+  private final StateMachine<State> stateMachine;
+  private ScheduleStatus previousState = null;
+  private final Clock clock;
+
+  /**
+   * Composes a schedule status and a state change argument.  Only the ScheduleStatuses in two
+   * States must be equal for them to be considered equal.
+   */
+  private static class State {
+    private final ScheduleStatus state;
+    private final Function<IScheduledTask, IScheduledTask> mutation;
+
+    State(ScheduleStatus state, Function<IScheduledTask, IScheduledTask> mutation) {
+      this.state = state;
+      this.mutation = mutation;
+    }
+
+    static State create(ScheduleStatus status) {
+      return create(status, Functions.<IScheduledTask>identity());
+    }
+
+    static State create(
+        ScheduleStatus status,
+        Function<IScheduledTask, IScheduledTask> mutation) {
+
+      return new State(status, mutation);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof State)) {
+        return false;
+      }
+
+      if (o == this) {
+        return true;
+      }
+
+      State other = (State) o;
+      return state == other.state;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder()
+          .append(state)
+          .toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return state.toString();
+    }
+
+    private ScheduleStatus getState() {
+      return state;
+    }
+
+    private Function<IScheduledTask, IScheduledTask> getMutation() {
+      return mutation;
+    }
+  }
+
+  /**
+   * A write-only work acceptor.
+   */
+  public interface WorkSink {
+    /**
+     * Appends external work that must be performed for a state machine transition to be fully
+     * complete.
+     *
+     * @param work Description of the work to be performed.
+     * @param stateMachine The state machine that the work is associated with.
+     * @param mutation Mutate operation to perform along with the state transition.
+     */
+    void addWork(
+        WorkCommand work,
+        TaskStateMachine stateMachine,
+        Function<IScheduledTask, IScheduledTask> mutation);
+  }
+
+  /**
+   * Creates a new task state machine.
+   *
+   * @param taskId ID of the task managed by this state machine.
+   * @param task Read-only task that this state machine manages.
+   * @param workSink Work sink to receive transition response actions
+   * @param clock Clock to use for reading the current time.
+   * @param initialState The state to begin the state machine at.  All legal transitions will be
+   *     added, but this allows the state machine to 'skip' states, for instance when a task is
+   *     loaded from a persistent store.
+   */
+  public TaskStateMachine(
+      final String taskId,
+      final IScheduledTask task,
+      final WorkSink workSink,
+      final Clock clock,
+      final ScheduleStatus initialState) {
+
+    this.taskId = MorePreconditions.checkNotBlank(taskId);
+    this.workSink = checkNotNull(workSink);
+    this.clock = checkNotNull(clock);
+    checkNotNull(initialState);
+
+    @SuppressWarnings("unchecked")
+    Closure<Transition<State>> manageTerminatedTasks = Closures.combine(
+        /* Kill a task that we believe to be terminated when an attempt is made to revive. */
+        Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
+            addWorkClosure(WorkCommand.KILL)),
+        /* Remove a terminated task that is remotely removed. */
+        Closures.filter(Transition.to(UNKNOWN), addWorkClosure(WorkCommand.DELETE)));
+
+    final Closure<Transition<State>> manageRestartingTask = new Closure<Transition<State>>() {
+      @SuppressWarnings("fallthrough")
+      @Override public void execute(Transition<State> transition) {
+        switch (transition.getTo().getState()) {
+          case ASSIGNED:
+          case STARTING:
+          case RUNNING:
+            addWork(WorkCommand.KILL);
+            break;
+
+          case LOST:
+            addWork(WorkCommand.KILL);
+            // fall through
+
+          case FINISHED:
+          case FAILED:
+          case KILLED:
+            addWork(WorkCommand.RESCHEDULE, transition.getTo().getMutation());
+            break;
+
+          case UNKNOWN:
+            updateState(ScheduleStatus.LOST);
+            break;
+
+          default:
+            // No-op.
+        }
+      }
+    };
+
+    // To be called on a task transitioning into the FINISHED state.
+    final Command rescheduleIfService = new Command() {
+      @Override public void execute() {
+        if (task.getAssignedTask().getTask().isIsService()) {
+          addWork(WorkCommand.RESCHEDULE);
+        }
+      }
+    };
+
+    // To be called on a task transitioning into the FAILED state.
+    final Command incrementFailuresMaybeReschedule = new Command() {
+      @Override public void execute() {
+        addWork(WorkCommand.INCREMENT_FAILURES);
+
+        // Max failures is ignored for service task.
+        boolean isService = task.getAssignedTask().getTask().isIsService();
+
+        // Max failures is ignored when set to -1.
+        int maxFailures = task.getAssignedTask().getTask().getMaxTaskFailures();
+        if (isService || (maxFailures == -1) || (task.getFailureCount() < (maxFailures - 1))) {
+          addWork(WorkCommand.RESCHEDULE);
+        } else {
+          LOG.info("Task " + getTaskId() + " reached failure limit, not rescheduling");
+        }
+      }
+    };
+
+    stateMachine = StateMachine.<State>builder(taskId)
+        .logTransitions()
+        .initialState(State.create(initialState))
+        .addState(
+            Rule.from(INIT)
+                .to(PENDING, UNKNOWN))
+        .addState(
+            Rule.from(PENDING)
+                .to(ASSIGNED, KILLING)
+                .withCallback(
+                    new Closure<Transition<State>>() {
+                      @Override public void execute(Transition<State> transition) {
+                        switch (transition.getTo().getState()) {
+                          case KILLING:
+                            addWork(WorkCommand.DELETE);
+                            break;
+
+                          default:
+                            // No-op.
+                        }
+                      }
+                    }
+                ))
+        .addState(
+            Rule.from(ASSIGNED)
+                .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, KILLED,
+                    KILLING, LOST, PREEMPTING)
+                .withCallback(
+                    new Closure<Transition<State>>() {
+                      @SuppressWarnings("fallthrough")
+                      @Override public void execute(Transition<State> transition) {
+                        switch (transition.getTo().getState()) {
+                          case FINISHED:
+                            rescheduleIfService.execute();
+                            break;
+
+                          case PREEMPTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case FAILED:
+                            incrementFailuresMaybeReschedule.execute();
+                            break;
+
+                          case RESTARTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case KILLED:
+                            addWork(WorkCommand.RESCHEDULE);
+                            break;
+
+                          case LOST:
+                            addWork(WorkCommand.RESCHEDULE);
+                            // fall through
+                          case KILLING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case UNKNOWN:
+                            break;
+
+                           default:
+                             // No-op.
+                        }
+                      }
+                    }
+                ))
+        .addState(
+            Rule.from(STARTING)
+                .to(RUNNING, FINISHED, FAILED, RESTARTING, KILLING, KILLED, LOST, PREEMPTING)
+                .withCallback(
+                    new Closure<Transition<State>>() {
+                      @SuppressWarnings("fallthrough")
+                      @Override public void execute(Transition<State> transition) {
+                        switch (transition.getTo().getState()) {
+                          case FINISHED:
+                            rescheduleIfService.execute();
+                            break;
+
+                          case RESTARTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case PREEMPTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case FAILED:
+                            incrementFailuresMaybeReschedule.execute();
+                            break;
+
+                          case KILLED:
+                            addWork(WorkCommand.RESCHEDULE);
+                            break;
+
+                          case KILLING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case LOST:
+                            addWork(WorkCommand.RESCHEDULE);
+                            break;
+
+                          case UNKNOWN:
+                            // The slave previously acknowledged that it had the task, and now
+                            // stopped reporting it.
+                            updateState(ScheduleStatus.LOST);
+                            break;
+
+                           default:
+                             // No-op.
+                        }
+                      }
+                    }
+                ))
+        .addState(
+            Rule.from(RUNNING)
+                .to(FINISHED, RESTARTING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
+                .withCallback(
+                    new Closure<Transition<State>>() {
+                      @SuppressWarnings("fallthrough")
+                      @Override public void execute(Transition<State> transition) {
+                        switch (transition.getTo().getState()) {
+                          case FINISHED:
+                            rescheduleIfService.execute();
+                            break;
+
+                          case PREEMPTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case RESTARTING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case FAILED:
+                            incrementFailuresMaybeReschedule.execute();
+                            break;
+
+                          case KILLED:
+                            addWork(WorkCommand.RESCHEDULE);
+                            break;
+
+                          case KILLING:
+                            addWork(WorkCommand.KILL);
+                            break;
+
+                          case LOST:
+                            addWork(WorkCommand.RESCHEDULE);
+                            break;
+
+                          case UNKNOWN:
+                            updateState(ScheduleStatus.LOST);
+                            break;
+
+                           default:
+                             // No-op.
+                        }
+                      }
+                    }
+                ))
+        .addState(
+            Rule.from(FINISHED)
+                .to(UNKNOWN)
+                .withCallback(manageTerminatedTasks))
+        .addState(
+            Rule.from(PREEMPTING)
+                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
+                .withCallback(manageRestartingTask))
+        .addState(
+            Rule.from(RESTARTING)
+                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
+                .withCallback(manageRestartingTask))
+        .addState(
+            Rule.from(FAILED)
+                .to(UNKNOWN)
+                .withCallback(manageTerminatedTasks))
+        .addState(
+            Rule.from(KILLED)
+                .to(UNKNOWN)
+                .withCallback(manageTerminatedTasks))
+        .addState(
+            Rule.from(KILLING)
+                .to(FINISHED, FAILED, KILLED, LOST, UNKNOWN)
+                .withCallback(manageTerminatedTasks))
+        .addState(
+            Rule.from(LOST)
+                .to(UNKNOWN)
+                .withCallback(manageTerminatedTasks))
+        .addState(
+            Rule.from(UNKNOWN)
+                .noTransitions()
+                .withCallback(manageTerminatedTasks))
+        // Since we want this action to be performed last in the transition sequence, the callback
+        // must be the last chained transition callback.
+        .onAnyTransition(
+            new Closure<Transition<State>>() {
+              @Override public void execute(final Transition<State> transition) {
+                ScheduleStatus from = transition.getFrom().getState();
+                ScheduleStatus to = transition.getTo().getState();
+
+                if (transition.isValidStateChange() && (to != ScheduleStatus.UNKNOWN)
+                    // Prevent an update when killing a pending task, since the task is deleted
+                    // prior to the update.
+                    && !((from == ScheduleStatus.PENDING) && (to == ScheduleStatus.KILLING))) {
+                  addWork(WorkCommand.UPDATE_STATE, transition.getTo().getMutation());
+                } else if (!transition.isAllowed()) {
+                  LOG.log(Level.SEVERE, "Illegal state transition attempted: " + transition);
+                  ILLEGAL_TRANSITIONS.incrementAndGet();
+                }
+
+                if (transition.isValidStateChange()) {
+                  previousState = from;
+                }
+              }
+            }
+        )
+        // TODO(wfarner): Consider alternatives to allow exceptions to surface.  This would allow
+        // the state machine to surface illegal state transitions and propagate better information
+        // to the caller.  As it stands, the caller must implement logic that really belongs in
+        // the state machine.  For example, preventing RESTARTING->UPDATING transitions
+        // (or for that matter, almost any user-initiated state transition) is awkward.
+        .throwOnBadTransition(false)
+        .build();
+  }
+
+  private Closure<Transition<State>> addWorkClosure(final WorkCommand work) {
+    return new Closure<Transition<State>>() {
+      @Override public void execute(Transition<State> item) {
+        addWork(work);
+      }
+    };
+  }
+
+  private void addWork(WorkCommand work) {
+    addWork(work, Functions.<IScheduledTask>identity());
+  }
+
+  private void addWork(WorkCommand work, Function<IScheduledTask, IScheduledTask> mutation) {
+    LOG.info("Adding work command " + work + " for " + this);
+    workSink.addWork(work, TaskStateMachine.this, mutation);
+  }
+
+  /**
+   * Same as {@link #updateState(ScheduleStatus, Function)}, but uses a noop mutation.
+   *
+   * @param status Status to apply to the task.
+   * @return {@code true} if the state change was allowed, {@code false} otherwise.
+   */
+  public synchronized boolean updateState(ScheduleStatus status) {
+    return updateState(status, Functions.<IScheduledTask>identity());
+  }
+
+  /**
+   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but uses a noop mutation.
+   *
+   * @param status Status to apply to the task.
+   * @param auditMessage The (optional) audit message to associate with the transition.
+   * @return {@code true} if the state change was allowed, {@code false} otherwise.
+   */
+  public synchronized boolean updateState(ScheduleStatus status, Optional<String> auditMessage) {
+    return updateState(status, Functions.<IScheduledTask>identity(), auditMessage);
+  }
+
+  /**
+   * Same as {@link #updateState(ScheduleStatus, Function, Optional)}, but omits the audit message.
+   *
+   * @param status Status to apply to the task.
+   * @param mutation Mutate operation to perform while updating the task.
+   * @return {@code true} if the state change was allowed, {@code false} otherwise.
+   */
+  public synchronized boolean updateState(
+      ScheduleStatus status,
+      Function<IScheduledTask, IScheduledTask> mutation) {
+
+    return updateState(status, mutation, Optional.<String>absent());
+  }
+
+  /**
+   * Attempt to transition the state machine to the provided state.
+   * At the time this method returns, any work commands required to satisfy the state transition
+   * will be appended to the work queue.
+   *
+   * @param status Status to apply to the task.
+   * @param auditMessage The audit message to associate with the transition.
+   * @param mutation Mutate operation to perform while updating the task.
+   * @return {@code true} if the state change was allowed, {@code false} otherwise.
+   */
+  public synchronized boolean updateState(
+      final ScheduleStatus status,
+      Function<IScheduledTask, IScheduledTask> mutation,
+      final Optional<String> auditMessage) {
+
+    checkNotNull(status);
+    checkNotNull(mutation);
+    checkNotNull(auditMessage);
+
+    /**
+     * Don't bother applying noop state changes.  If we end up modifying task state without a
+     * state transition (e.g. storing resource consumption of a running task), we need to find
+     * a different way to suppress noop transitions.
+     */
+    if (stateMachine.getState().getState() != status) {
+      Function<IScheduledTask, IScheduledTask> operation = Functions.compose(mutation,
+          new Function<IScheduledTask, IScheduledTask>() {
+            @Override public IScheduledTask apply(IScheduledTask task) {
+              ScheduledTask builder = task.newBuilder();
+              builder.addToTaskEvents(new TaskEvent()
+                  .setTimestamp(clock.nowMillis())
+                  .setStatus(status)
+                  .setMessage(auditMessage.orNull())
+                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+              return IScheduledTask.build(builder);
+            }
+          });
+      return stateMachine.transition(State.create(status, operation));
+    }
+
+    return false;
+  }
+
+  /**
+   * Fetch the current state from the state machine.
+   *
+   * @return The current state.
+   */
+  public synchronized ScheduleStatus getState() {
+    return stateMachine.getState().getState();
+  }
+
+  /**
+   * Gets the ID for the task that this state machine manages.
+   *
+   * @return The state machine's task ID.
+   */
+  public String getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * Gets the previous state of this state machine.
+   *
+   * @return The state machine's previous state, or {@code null} if the state machine has not
+   *     transitioned since being created.
+   */
+  @Nullable
+  ScheduleStatus getPreviousState() {
+    return previousState;
+  }
+
+  @Override
+  public String toString() {
+    return getTaskId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java b/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
new file mode 100644
index 0000000..d8de19c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/UUIDGenerator.java
@@ -0,0 +1,17 @@
+package com.twitter.aurora.scheduler.state;
+
+import java.util.UUID;
+
+/**
+ * Wraps {@link java.util.UUID#randomUUID()} to facilitate unit testing.
+ */
+interface UUIDGenerator {
+  UUID createNew();
+
+  class UUIDGeneratorImpl implements UUIDGenerator {
+    @Override
+    public UUID createNew() {
+      return UUID.randomUUID();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
new file mode 100644
index 0000000..6c5637d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/WorkCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.state;
+
+/**
+ * Descriptions of the different types of external work commands that task state machines may
+ * trigger.
+ */
+enum WorkCommand {
+  // Send an instruction for the runner of this task to kill the task.
+  KILL,
+  // Create a new state machine with a copy of this task.
+  RESCHEDULE,
+  // Update the task's state (schedule status) in the persistent store to match the state machine.
+  UPDATE_STATE,
+  // Delete this task from the persistent store.
+  DELETE,
+  // Increment the failure count for this task.
+  INCREMENT_FAILURES
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
new file mode 100644
index 0000000..a52c5c5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.stats;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.BindingAnnotation;
+
+import org.apache.mesos.Protos.Offer;
+
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.scheduler.async.OfferQueue;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.stats.SlotSizeCounter.ResourceSlotProvider;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Module to configure export of cluster-wide resource allocation and consumption statistics.
+ */
+public class AsyncStatsModule extends AbstractModule {
+
+  @CmdLine(name = "async_task_stat_update_interval",
+      help = "Interval on which to try to update resource consumption stats.")
+  private static final Arg<Amount<Long, Time>> TASK_STAT_INTERVAL =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "async_slot_stat_update_interval",
+      help = "Interval on which to try to update open slot stats.")
+  private static final Arg<Amount<Long, Time>> SLOT_STAT_INTERVAL =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @BindingAnnotation
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface StatExecutor { }
+
+  @Override
+  protected void configure() {
+    final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("AsyncStat-%d").setDaemon(true).build());
+
+    bind(TaskStatCalculator.class).in(Singleton.class);
+    bind(CachedCounters.class).in(Singleton.class);
+    bind(ResourceSlotProvider.class).to(OfferAdapter.class);
+    bind(SlotSizeCounter.class).in(Singleton.class);
+
+    bind(ScheduledExecutorService.class).annotatedWith(StatExecutor.class).toInstance(executor);
+    LifecycleModule.bindStartupAction(binder(), StatUpdater.class);
+  }
+
+  static class StatUpdater implements Command {
+    private final ScheduledExecutorService executor;
+    private final TaskStatCalculator taskStats;
+    private final SlotSizeCounter slotCounter;
+
+    @Inject
+    StatUpdater(
+        @StatExecutor ScheduledExecutorService executor,
+        TaskStatCalculator taskStats,
+        SlotSizeCounter slotCounter) {
+
+      this.executor = checkNotNull(executor);
+      this.taskStats = checkNotNull(taskStats);
+      this.slotCounter = checkNotNull(slotCounter);
+    }
+
+    @Override
+    public void execute() {
+      long taskInterval = TASK_STAT_INTERVAL.get().as(Time.SECONDS);
+      executor.scheduleAtFixedRate(taskStats, taskInterval, taskInterval, TimeUnit.SECONDS);
+      long slotInterval = SLOT_STAT_INTERVAL.get().as(Time.SECONDS);
+      executor.scheduleAtFixedRate(slotCounter, slotInterval, slotInterval, TimeUnit.SECONDS);
+    }
+  }
+
+  static class OfferAdapter implements ResourceSlotProvider {
+    private static final Function<Offer, IQuota> TO_QUOTA = new Function<Offer, IQuota>() {
+      @Override public IQuota apply(Offer offer) {
+        Resources resources = Resources.from(offer);
+        return IQuota.build(new Quota()
+            .setNumCpus(resources.getNumCpus())
+            .setRamMb(resources.getRam().as(Data.MB))
+            .setDiskMb(resources.getDisk().as(Data.MB)));
+      }
+    };
+
+    private final OfferQueue offerQueue;
+
+    @Inject
+    OfferAdapter(OfferQueue offerQueue) {
+      this.offerQueue = checkNotNull(offerQueue);
+    }
+
+    @Override
+    public Iterable<IQuota> get() {
+      Iterable<Offer> offers = offerQueue.getOffers();
+      return FluentIterable.from(offers).transform(TO_QUOTA);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java b/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
new file mode 100644
index 0000000..81d6811
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/stats/CachedCounters.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import com.twitter.common.stats.StatsProvider;
+
+/**
+ * A cache of stats, allowing counters to be fetched and reused based on their names.
+ */
+class CachedCounters {
+  private final LoadingCache<String, AtomicLong> cache;
+
+  @Inject
+  CachedCounters(final StatsProvider stats) {
+    cache = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, AtomicLong>() {
+          @Override public AtomicLong load(String key) {
+            return stats.makeCounter(key);
+          }
+        }
+    );
+  }
+
+  AtomicLong get(String name) {
+    return cache.getUnchecked(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
new file mode 100644
index 0000000..7b96e86
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.stats;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StorageException;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Computes aggregate metrics about resource allocation and consumption in the scheduler.
+ */
+public class ResourceCounter {
+  private final Storage storage;
+
+  @Inject
+  ResourceCounter(Storage storage) {
+    this.storage = Preconditions.checkNotNull(storage);
+  }
+
+  private Iterable<ITaskConfig> getTasks(Query.Builder query) throws StorageException {
+    return Iterables.transform(
+        Storage.Util.consistentFetchTasks(storage, query),
+        Tasks.SCHEDULED_TO_INFO);
+  }
+
+  /**
+   * Computes totals for each of the {@link MetricType}s.
+   *
+   * @return aggregates for each global metric type.
+   * @throws StorageException if there was a problem fetching tasks from storage.
+   */
+  public List<GlobalMetric> computeConsumptionTotals() throws StorageException {
+    List<GlobalMetric> counts = Arrays.asList(
+        new GlobalMetric(MetricType.TOTAL_CONSUMED),
+        new GlobalMetric(MetricType.DEDICATED_CONSUMED),
+        new GlobalMetric(MetricType.QUOTA_CONSUMED),
+        new GlobalMetric(MetricType.FREE_POOL_CONSUMED));
+
+    for (ITaskConfig task : getTasks(Query.unscoped().active())) {
+      for (GlobalMetric count : counts) {
+        count.accumulate(task);
+      }
+    }
+    return counts;
+  }
+
+  /**
+   * Computes total quota allocations.
+   *
+   * @return Total allocated quota.
+   * @throws StorageException if there was a problem fetching quotas from storage.
+   */
+  public Metric computeQuotaAllocationTotals() throws StorageException {
+    return storage.weaklyConsistentRead(new Work.Quiet<Metric>() {
+      @Override public Metric apply(StoreProvider storeProvider) {
+        Metric allocation = new Metric();
+        for (IQuota quota : storeProvider.getQuotaStore().fetchQuotas().values()) {
+          allocation.accumulate(quota);
+        }
+        return allocation;
+      }
+    });
+  }
+
+  /**
+   * Computes arbitrary resource aggregates based on a query, a filter, and a grouping function.
+   *
+   * @param query Query to select tasks for aggregation.
+   * @param filter Filter to apply on query result tasks.
+   * @param keyFunction Function to define aggregation groupings.
+   * @param <K> Key type.
+   * @return A map from the keys to their aggregates based on the tasks fetched.
+   * @throws StorageException if there was a problem fetching tasks from storage.
+   */
+  public <K> Map<K, Metric> computeAggregates(
+      Query.Builder query,
+      Predicate<ITaskConfig> filter,
+      Function<ITaskConfig, K> keyFunction) throws StorageException {
+
+    LoadingCache<K, Metric> metrics = CacheBuilder.newBuilder()
+        .build(new CacheLoader<K, Metric>() {
+          @Override public Metric load(K key) {
+            return new Metric();
+          }
+        });
+    for (ITaskConfig task : Iterables.filter(getTasks(query), filter)) {
+      metrics.getUnchecked(keyFunction.apply(task)).accumulate(task);
+    }
+    return metrics.asMap();
+  }
+
+  public enum MetricType {
+    TOTAL_CONSUMED(Predicates.<ITaskConfig>alwaysTrue()),
+    DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
+      @Override public boolean apply(ITaskConfig task) {
+        return ConfigurationManager.isDedicated(task);
+      }
+    }),
+    QUOTA_CONSUMED(new Predicate<ITaskConfig>() {
+      @Override public boolean apply(ITaskConfig task) {
+        return task.isProduction();
+      }
+    }),
+    FREE_POOL_CONSUMED(new Predicate<ITaskConfig>() {
+      @Override public boolean apply(ITaskConfig task) {
+        return !ConfigurationManager.isDedicated(task) && !task.isProduction();
+      }
+    });
+
+    public final Predicate<ITaskConfig> filter;
+
+    MetricType(Predicate<ITaskConfig> filter) {
+      this.filter = filter;
+    }
+  }
+
+  public static class GlobalMetric extends Metric {
+    public final MetricType type;
+
+    public GlobalMetric(MetricType type) {
+      this.type = type;
+    }
+
+    @Override
+    protected void accumulate(ITaskConfig task) {
+      if (type.filter.apply(task)) {
+        super.accumulate(task);
+      }
+    }
+  }
+
+  public static class Metric {
+    private long cpu = 0;
+    private long ramMb = 0;
+    private long diskMb = 0;
+
+    public Metric() {
+      this.cpu = 0;
+      this.ramMb = 0;
+      this.diskMb = 0;
+    }
+
+    public Metric(Metric copy) {
+      this.cpu = copy.cpu;
+      this.ramMb = copy.ramMb;
+      this.diskMb = copy.diskMb;
+    }
+
+    protected void accumulate(ITaskConfig task) {
+      cpu += task.getNumCpus();
+      ramMb += task.getRamMb();
+      diskMb += task.getDiskMb();
+    }
+
+    protected void accumulate(IQuota quota) {
+      cpu += quota.getNumCpus();
+      ramMb += quota.getRamMb();
+      diskMb += quota.getDiskMb();
+    }
+
+    public long getCpu() {
+      return cpu;
+    }
+
+    public long getRamGb() {
+      return ramMb / 1024;
+    }
+
+    public long getDiskGb() {
+      return diskMb / 1024;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
new file mode 100644
index 0000000..6c0bf48
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.stats;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.scheduler.quota.Quotas;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A stat computer that aggregates the number of 'slots' available at different pre-determined
+ * slot sizes.
+ */
+class SlotSizeCounter implements Runnable {
+  private static final Map<String, IQuota> SLOT_SIZES = ImmutableMap.of(
+      "small", IQuota.build(new Quota(1.0, 1024, 4096)),
+      "medium", IQuota.build(new Quota(4.0, 8192, 16384)),
+      "large", IQuota.build(new Quota(8.0, 16384, 32768)),
+      "xlarge", IQuota.build(new Quota(16.0, 32768, 65536)));
+
+  private final Map<String, IQuota> slotSizes;
+  private final ResourceSlotProvider resourceSlotProvider;
+  private final CachedCounters cachedCounters;
+
+  @VisibleForTesting
+  SlotSizeCounter(
+      final Map<String, IQuota> slotSizes,
+      ResourceSlotProvider resourceSlotProvider,
+      CachedCounters cachedCounters) {
+
+    this.slotSizes = checkNotNull(slotSizes);
+    this.resourceSlotProvider = checkNotNull(resourceSlotProvider);
+    this.cachedCounters = checkNotNull(cachedCounters);
+  }
+
+  interface ResourceSlotProvider {
+    Iterable<IQuota> get();
+  }
+
+  @Inject
+  SlotSizeCounter(ResourceSlotProvider resourceSlotProvider, CachedCounters cachedCounters) {
+    this(SLOT_SIZES, resourceSlotProvider, cachedCounters);
+  }
+
+  @VisibleForTesting
+  static String getStatName(String slotName) {
+    return "empty_slots_" + slotName;
+  }
+
+  private int countSlots(Iterable<IQuota> slots, final IQuota slotSize) {
+    Function<IQuota, Integer> counter = new Function<IQuota, Integer>() {
+      @Override public Integer apply(IQuota machineSlack) {
+        return Quotas.divide(machineSlack, slotSize);
+      }
+    };
+
+    int sum = 0;
+    for (int slotCount : FluentIterable.from(slots).transform(counter)) {
+      sum += slotCount;
+    }
+    return sum;
+  }
+
+  @Override
+  public void run() {
+    Iterable<IQuota> slots = resourceSlotProvider.get();
+    for (Map.Entry<String, IQuota> entry : slotSizes.entrySet()) {
+      cachedCounters.get(getStatName(entry.getKey())).set(countSlots(slots, entry.getValue()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
new file mode 100644
index 0000000..db34125
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
@@ -0,0 +1,46 @@
+package com.twitter.aurora.scheduler.stats;
+
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.twitter.aurora.scheduler.stats.AsyncStatsModule.StatUpdater;
+import com.twitter.aurora.scheduler.stats.ResourceCounter.GlobalMetric;
+import com.twitter.aurora.scheduler.stats.ResourceCounter.Metric;
+import com.twitter.aurora.scheduler.storage.Storage.StorageException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Calculates and exports aggregate stats about resources consumed by active tasks.
+ */
+class TaskStatCalculator implements Runnable {
+  private static final Logger LOG = Logger.getLogger(StatUpdater.class.getName());
+
+  private final CachedCounters counters;
+  private final ResourceCounter resourceCounter;
+
+  @Inject
+  TaskStatCalculator(ResourceCounter resourceCounter, CachedCounters counters) {
+    this.resourceCounter = checkNotNull(resourceCounter);
+    this.counters = checkNotNull(counters);
+  }
+
+  private void update(String prefix, Metric metric) {
+    counters.get(prefix + "_cpu").set(metric.getCpu());
+    counters.get(prefix + "_ram_gb").set(metric.getRamGb());
+    counters.get(prefix + "_disk_gb").set(metric.getDiskGb());
+  }
+
+  @Override
+  public void run() {
+    try {
+      for (GlobalMetric metric : resourceCounter.computeConsumptionTotals()) {
+        update("resources_" + metric.type.name().toLowerCase(), metric);
+      }
+      update("resources_allocated_quota", resourceCounter.computeQuotaAllocationTotals());
+    } catch (StorageException e) {
+      LOG.fine("Unable to fetch metrics, storage is likely not ready.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/AttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/AttributeStore.java b/src/main/java/org/apache/aurora/scheduler/storage/AttributeStore.java
new file mode 100644
index 0000000..35c666c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/AttributeStore.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+
+/**
+ * Storage interface for host attributes.
+ */
+public interface AttributeStore {
+  /**
+   * Fetches all host attributes given by the host.
+   *
+   * @param host host name.
+   * @return attributes associated with {@code host}, if the host is known.
+   */
+  Optional<HostAttributes> getHostAttributes(String host);
+
+  /**
+   * Fetches all attributes in the store.
+   *
+   * @return All host attributes.
+   */
+  Set<HostAttributes> getHostAttributes();
+
+  /**
+   * Attributes are considered mostly ephemeral and extremely low risk when inconsistency
+   * is present.
+   */
+  public interface Mutable extends AttributeStore {
+
+    /**
+     * Deletes all attributes in the store.
+     */
+    void deleteHostAttributes();
+
+    /**
+     * Save a host attribute in the attribute store.
+     *
+     * @param hostAttributes The attribute we are going to save.
+     */
+    void saveHostAttributes(HostAttributes hostAttributes);
+
+    /**
+     * Adjusts the maintenance mode for a host.
+     * No adjustment will be made if the host is unknown.
+     *
+     * @param host Host to adjust.
+     * @param mode Mode to place the host in.
+     * @return {@code true} if the host is known and the state was adjusted,
+     *         {@code false} if the host is unrecognized.
+     */
+    boolean setMaintenanceMode(String host, MaintenanceMode mode);
+  }
+
+  public static final class Util {
+    private Util() {
+    }
+
+    /**
+     * Fetches attributes about a {@code host}.
+     *
+     * @param store Store to fetch host attributes from.
+     * @param host Host to fetch attributes about.
+     * @return Attributes associated with {@code host}, or an empty iterable if the host is
+     *         unknown.
+     */
+    public static Iterable<Attribute> attributesOrNone(StoreProvider store, String host) {
+      Optional<HostAttributes> attributes = store.getAttributeStore().getHostAttributes(host);
+      return attributes.isPresent()
+          ? attributes.get().getAttributes() : ImmutableList.<Attribute>of();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
new file mode 100644
index 0000000..c71a114
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.common.util.StateMachine;
+
+/**
+ * A non-volatile storage wrapper that enforces method call ordering.
+ */
+public class CallOrderEnforcingStorage implements NonVolatileStorage {
+
+  /**
+   * Identifies a storage whose call order should be enforced.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  private @interface EnforceOrderOn { }
+
+  private final NonVolatileStorage wrapped;
+
+  private enum State {
+    CONSTRUCTED,
+    PREPARED,
+    READY,
+    STOPPED
+  }
+
+  private final StateMachine<State> stateMachine = StateMachine.<State>builder("storage")
+      .logTransitions()
+      .initialState(State.CONSTRUCTED)
+      .addState(State.CONSTRUCTED, State.PREPARED)
+      .addState(State.PREPARED, State.READY)
+      .addState(State.READY, State.STOPPED)
+      .build();
+
+  @Inject
+  CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped) {
+    this.wrapped = wrapped;
+  }
+
+  private void checkInState(State state) throws StorageException {
+    if (stateMachine.getState() != state) {
+      throw new StorageException("Storage is not " + state);
+    }
+  }
+
+  @Override
+  public void prepare() throws StorageException {
+    checkInState(State.CONSTRUCTED);
+    wrapped.prepare();
+    stateMachine.transition(State.PREPARED);
+  }
+
+  @SendNotification(after = Event.StorageStarted)
+  @Override
+  public void start(Quiet initializationLogic) throws StorageException {
+    checkInState(State.PREPARED);
+    wrapped.start(initializationLogic);
+    stateMachine.transition(State.READY);
+  }
+
+  @Override
+  public void stop() {
+    wrapped.stop();
+    stateMachine.transition(State.STOPPED);
+  }
+
+  @Override
+  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+    checkInState(State.READY);
+    return wrapped.consistentRead(work);
+  }
+
+  @Override
+  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
+      throws StorageException, E {
+
+    checkInState(State.READY);
+    return wrapped.weaklyConsistentRead(work);
+  }
+
+  @Override
+  public <T, E extends Exception> T write(MutateWork<T, E> work)
+      throws StorageException, E {
+    checkInState(State.READY);
+    return wrapped.write(work);
+  }
+
+  @Override
+  public void snapshot() throws StorageException {
+    checkInState(State.READY);
+    wrapped.snapshot();
+  }
+
+  /**
+   * Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
+   * exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.
+   *
+   * @param storageClass Non-volatile storage implementation class.
+   * @return Binding module.
+   */
+  public static Module wrappingModule(final Class<? extends NonVolatileStorage> storageClass) {
+    return new PrivateModule() {
+      @Override protected void configure() {
+        bind(Storage.class).to(CallOrderEnforcingStorage.class);
+        bind(NonVolatileStorage.class).to(CallOrderEnforcingStorage.class);
+        bind(CallOrderEnforcingStorage.class).in(Singleton.class);
+        bind(NonVolatileStorage.class).annotatedWith(EnforceOrderOn.class).to(storageClass);
+        expose(Storage.class);
+        expose(NonVolatileStorage.class);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
new file mode 100644
index 0000000..cba6303
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.Snapshot;
+
+/**
+ * A distributed snapshot store that supports persisting globally-visible snapshots.
+ */
+public interface DistributedSnapshotStore {
+  /**
+   * Writes a snapshot to the distributed storage system.
+   * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
+   * RuntimeExceptions).  Clean these up to be checked, and throw another exception type here.
+   *
+   * @param snapshot Snapshot to write.
+   * @throws CodingException If the snapshot could not be serialized.
+   */
+  void persist(Snapshot snapshot) throws CodingException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
new file mode 100644
index 0000000..1a6a849
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A store that forwards all its operations to underlying storage systems.  Useful for decorating
+ * an existing storage system.
+ */
+public class ForwardingStore implements
+    Storage,
+    SchedulerStore.Mutable,
+    JobStore.Mutable,
+    TaskStore.Mutable,
+    LockStore.Mutable,
+    QuotaStore.Mutable,
+    AttributeStore.Mutable {
+
+  private final Storage storage;
+  private final SchedulerStore.Mutable schedulerStore;
+  private final JobStore.Mutable jobStore;
+  private final TaskStore.Mutable taskStore;
+  private final LockStore.Mutable lockStore;
+  private final QuotaStore.Mutable quotaStore;
+  private final AttributeStore.Mutable attributeStore;
+
+  /**
+   * Creats a new forwarding store that delegates to the providing default stores.
+   *
+   * @param storage Delegate.
+   * @param schedulerStore Delegate.
+   * @param jobStore Delegate.
+   * @param taskStore Delegate.
+   * @param lockStore Delegate.
+   * @param quotaStore Delegate.
+   * @param attributeStore Delegate.
+   */
+  public ForwardingStore(
+      Storage storage,
+      SchedulerStore.Mutable schedulerStore,
+      JobStore.Mutable jobStore,
+      TaskStore.Mutable taskStore,
+      LockStore.Mutable lockStore,
+      QuotaStore.Mutable quotaStore,
+      AttributeStore.Mutable attributeStore) {
+
+    this.storage = checkNotNull(storage);
+    this.schedulerStore = checkNotNull(schedulerStore);
+    this.jobStore = checkNotNull(jobStore);
+    this.taskStore = checkNotNull(taskStore);
+    this.lockStore = checkNotNull(lockStore);
+    this.quotaStore = checkNotNull(quotaStore);
+    this.attributeStore = checkNotNull(attributeStore);
+  }
+
+  @Override
+  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+    return storage.consistentRead(work);
+  }
+
+  @Override
+  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
+      throws StorageException, E {
+    return storage.weaklyConsistentRead(work);
+  }
+
+  @Override
+  public <T, E extends Exception> T write(MutateWork<T, E> work)
+      throws StorageException, E {
+
+    return storage.write(work);
+  }
+
+  @Override
+  public void snapshot() throws StorageException {
+    storage.snapshot();
+  }
+
+  @Override
+  public void saveFrameworkId(String frameworkId) {
+    schedulerStore.saveFrameworkId(frameworkId);
+  }
+
+  @Override
+  @Nullable
+  public String fetchFrameworkId() {
+    return schedulerStore.fetchFrameworkId();
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> fetchJobs(String managerId) {
+    return jobStore.fetchJobs(managerId);
+  }
+
+  @Override
+  public Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey) {
+    return jobStore.fetchJob(managerId, jobKey);
+  }
+
+  @Override
+  public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
+    jobStore.saveAcceptedJob(managerId, jobConfig);
+  }
+
+  @Override
+  public void removeJob(IJobKey jobKey) {
+    jobStore.removeJob(jobKey);
+  }
+
+  @Override
+  public void deleteJobs() {
+    jobStore.deleteJobs();
+  }
+
+  @Override
+  public Set<String> fetchManagerIds() {
+    return jobStore.fetchManagerIds();
+  }
+
+  @Override
+  public void saveTasks(Set<IScheduledTask> tasks) throws IllegalStateException {
+    taskStore.saveTasks(tasks);
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    taskStore.deleteAllTasks();
+  }
+
+  @Override
+  public void deleteTasks(Set<String> taskIds) {
+    taskStore.deleteTasks(taskIds);
+  }
+
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Query.Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    return taskStore.mutateTasks(query, mutator);
+  }
+
+  @Override
+  public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
+    return taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
+  }
+
+  @Override
+  public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
+    return taskStore.fetchTasks(querySupplier);
+  }
+
+  @Override
+  public Set<ILock> fetchLocks() {
+    return lockStore.fetchLocks();
+  }
+
+  @Override
+  public Optional<ILock> fetchLock(ILockKey lockKey) {
+    return lockStore.fetchLock(lockKey);
+  }
+
+  @Override
+  public void saveLock(ILock lock) {
+    lockStore.saveLock(lock);
+  }
+
+  @Override
+  public void removeLock(ILockKey lockKey) {
+    lockStore.removeLock(lockKey);
+  }
+
+  @Override
+  public void deleteLocks() {
+    lockStore.deleteLocks();
+  }
+
+  @Override
+  public Map<String, IQuota> fetchQuotas() {
+    return quotaStore.fetchQuotas();
+  }
+
+  @Override
+  public void removeQuota(String role) {
+    quotaStore.removeQuota(role);
+  }
+
+  @Override
+  public void deleteQuotas() {
+    quotaStore.deleteQuotas();
+  }
+
+  @Override
+  public void saveQuota(String role, IQuota quota) {
+    quotaStore.saveQuota(role, quota);
+  }
+
+  @Override
+  public Optional<IQuota> fetchQuota(String role) {
+    return quotaStore.fetchQuota(role);
+  }
+
+  @Override
+  public void saveHostAttributes(HostAttributes hostAttribute) {
+    attributeStore.saveHostAttributes(hostAttribute);
+  }
+
+  @Override
+  public Optional<HostAttributes> getHostAttributes(String host) {
+    return attributeStore.getHostAttributes(host);
+  }
+
+  @Override
+  public Set<HostAttributes> getHostAttributes() {
+    return attributeStore.getHostAttributes();
+  }
+
+  @Override
+  public void deleteHostAttributes() {
+    attributeStore.deleteHostAttributes();
+  }
+
+  @Override
+  public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
+    return attributeStore.setMaintenanceMode(host, mode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/JobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobStore.java
new file mode 100644
index 0000000..7092e91
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobStore.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+/**
+ * Stores job configuration data.
+ */
+public interface JobStore {
+
+  /**
+   * Fetches all {@code JobConfiguration}s for jobs owned by the manager identified by
+   * {@code managerId}; if there are none then an empty set is returned.
+   *
+   * @param managerId The unique identifier of the manager to find registered jobs for.
+   * @return the set of job configurations owned by the specififed job manager
+   */
+  Iterable<IJobConfiguration> fetchJobs(String managerId);
+
+  /**
+   * Fetches the {@code JobConfiguration} for the specified {@code jobKey} if it exists.
+   *
+   * @param managerId The unique identifier of the manager that accepted the job.
+   * @param jobKey The jobKey identifying the job to be fetched.
+   * @return the job configuration for the given {@code jobKey} or absent if none is found.
+   */
+  Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey);
+
+  /**
+   * Fetches all the unique manager ids that are present in the job store.
+   *
+   * @return The IDs of all stored job managers.
+   */
+  Set<String> fetchManagerIds();
+
+  public interface Mutable extends JobStore {
+    /**
+     * Saves the job configuration for a job that has been accepted by the scheduler. Acts as an
+     * update if the managerId already exists.
+     * TODO(William Farner): Consider accepting SanitizedConfiguration here to require that
+     * validation always happens for things entering storage.
+     *
+     * @param managerId The unique id of the manager that accepted the job.
+     * @param jobConfig The configuration of the accepted job.
+     */
+    void saveAcceptedJob(String managerId, IJobConfiguration jobConfig);
+
+    /**
+     * Removes the job configuration for the job identified by {@code jobKey}.
+     * If there is no stored configuration for the identified job, this method returns silently.
+     *
+     * @param jobKey the key identifying the job to delete.
+     */
+    void removeJob(IJobKey jobKey);
+
+    /**
+     * Deletes all jobs.
+     */
+    void deleteJobs();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
new file mode 100644
index 0000000..13d7317
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/LockStore.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * Stores all lock-related data and defines methods for saving, deleting and fetching locks.
+ */
+public interface LockStore {
+  /**
+   * Fetches all locks available in the store.
+   *
+   * @return All locks in the store.
+   */
+  Set<ILock> fetchLocks();
+
+  /**
+   * Fetches a lock by its key.
+   *
+   * @param lockKey Key of the lock to fetch.
+   * @return Optional lock.
+   */
+  Optional<ILock> fetchLock(ILockKey lockKey);
+
+  public interface Mutable extends LockStore {
+    /**
+     * Saves a new lock or overwrites the existing one with same LockKey.
+     *
+     * @param lock ILock to save.
+     */
+    void saveLock(ILock lock);
+
+    /**
+     * Removes the lock from the store.
+     *
+     * @param lockKey Key of the lock to remove.
+     */
+    void removeLock(ILockKey lockKey);
+
+    /**
+     * Deletes all locks from the store.
+     */
+    void deleteLocks();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/QuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/QuotaStore.java b/src/main/java/org/apache/aurora/scheduler/storage/QuotaStore.java
new file mode 100644
index 0000000..ba8116b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/QuotaStore.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+/**
+ * Point of storage for quota records.
+ */
+public interface QuotaStore {
+  /**
+   * Fetches the existing quota record for a role.
+   *
+   * @param role Role to fetch quota for.
+   * @return Optional quota associated with {@code role}.
+   */
+  Optional<IQuota> fetchQuota(String role);
+
+  /**
+   * Fetches all allocated quotas.
+   *
+   * @return All allocated quotas.
+   */
+  Map<String, IQuota> fetchQuotas();
+
+  public interface Mutable extends QuotaStore {
+
+    /**
+     * Deletes all quotas.
+     */
+    void deleteQuotas();
+
+    /**
+     * Deletes quota for a role.
+     *
+     * @param role Role to remove quota record for.
+     */
+    void removeQuota(String role);
+
+    /**
+     * Saves a quota record for a role.
+     *
+     * @param role Role to create or update a quota record for.
+     * @param quota Quota to save.
+     */
+    void saveQuota(String role, IQuota quota);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java b/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
new file mode 100644
index 0000000..58d7a68
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ReadWriteLockManager.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A lock manager that wraps a ReadWriteLock and detects ill-fated attempts to upgrade
+ * a read-locked thread to a write-locked thread, which would otherwise deadlock.
+ */
+public class ReadWriteLockManager {
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  enum LockMode {
+    NONE,
+    READ,
+    WRITE
+  }
+
+  private static class LockState {
+    private LockMode initialLockMode = LockMode.NONE;
+    private int lockCount = 0;
+
+    private boolean lockAcquired(LockMode mode) {
+      boolean stateChanged = false;
+      if (initialLockMode == LockMode.NONE) {
+        initialLockMode = mode;
+        stateChanged = true;
+      }
+      if (initialLockMode == mode) {
+        lockCount++;
+      }
+      return stateChanged;
+    }
+
+    private void lockReleased(LockMode mode) {
+      if (initialLockMode == mode) {
+        lockCount--;
+        if (lockCount == 0) {
+          initialLockMode = LockMode.NONE;
+        }
+      }
+    }
+  }
+
+  private final ThreadLocal<LockState> lockState = new ThreadLocal<LockState>() {
+    @Override protected LockState initialValue() {
+      return new LockState();
+    }
+  };
+
+  /**
+   * Blocks until this thread has acquired a read lock.
+   *
+   * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
+   *         secured the write lock and has yet to release it.
+   */
+  public boolean readLock() {
+    lock.readLock().lock();
+    return lockState.get().lockAcquired(LockMode.READ);
+  }
+
+  /**
+   * Releases this thread's read lock.
+   */
+  public void readUnlock() {
+    lock.readLock().unlock();
+    lockState.get().lockReleased(LockMode.READ);
+  }
+
+  /**
+   * Blocks until this thread has acquired a write lock.
+   *
+   * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
+   *         secured the write lock and has yet to release it.
+   */
+  public boolean writeLock() {
+    Preconditions.checkState(lockState.get().initialLockMode != LockMode.READ,
+        "A read operation may not be upgraded to a write operation.");
+
+    lock.writeLock().lock();
+    return lockState.get().lockAcquired(LockMode.WRITE);
+  }
+
+  /**
+   * Releases this thread's write lock.
+   */
+  public void writeUnlock() {
+    lock.writeLock().unlock();
+    lockState.get().lockReleased(LockMode.WRITE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/SchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/SchedulerStore.java b/src/main/java/org/apache/aurora/scheduler/storage/SchedulerStore.java
new file mode 100644
index 0000000..504b90b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/SchedulerStore.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import javax.annotation.Nullable;
+
+/**
+ * Stores data specific to the scheduler itself.
+ */
+public interface SchedulerStore {
+
+  /**
+   * Fetches the last saved framework id.  If none is saved, null can be returned.
+   *
+   * @return the last saved framework id
+   */
+  @Nullable String fetchFrameworkId();
+
+  public interface Mutable extends SchedulerStore {
+    /**
+     * Stores the given framework id overwriting any previously saved id.
+     *
+     * @param frameworkId The framework id to store.
+     */
+    void saveFrameworkId(String frameworkId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
new file mode 100644
index 0000000..c775207
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+/**
+ * Storage mechanism that is able to create complete snapshots of the local storage system state
+ * and apply these to restore local storage from a snapshotted baseline.
+ */
+public interface SnapshotStore<T> {
+
+  /**
+   * Creates a consistent snapshot of the local storage system.
+   *
+   * @return A blob that can be used to recover local storage via {@link #applySnapshot(Object)}.
+   */
+   T createSnapshot();
+
+  /**
+   * Applies a snapshot blob to the local storage system, wiping out all existing data and
+   * resetting with the contents of the snapshot.
+   *
+   * @param snapshot A snapshot blob created by {@link #createSnapshot()}.
+   */
+  void applySnapshot(T snapshot);
+}


Mime
View raw message