aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [29/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
new file mode 100644
index 0000000..74f06aa
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -0,0 +1,715 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.Op;
+import com.twitter.aurora.gen.storage.RemoveJob;
+import com.twitter.aurora.gen.storage.RemoveLock;
+import com.twitter.aurora.gen.storage.RemoveQuota;
+import com.twitter.aurora.gen.storage.RemoveTasks;
+import com.twitter.aurora.gen.storage.RewriteTask;
+import com.twitter.aurora.gen.storage.SaveAcceptedJob;
+import com.twitter.aurora.gen.storage.SaveFrameworkId;
+import com.twitter.aurora.gen.storage.SaveHostAttributes;
+import com.twitter.aurora.gen.storage.SaveLock;
+import com.twitter.aurora.gen.storage.SaveQuota;
+import com.twitter.aurora.gen.storage.SaveTasks;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
+import com.twitter.aurora.scheduler.storage.ForwardingStore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager;
+import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager.StreamTransaction;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Closure;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A storage implementation that ensures committed transactions are written to a log.
+ *
+ * <p>In the classic write-ahead log usage we'd perform mutations as follows:
+ * <ol>
+ *   <li>write op to log</li>
+ *   <li>perform op locally</li>
+ *   <li>*checkpoint</li>
+ * </ol>
+ *
+ * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
+ * have a record of our mutation in case we should need to recover state later after a crash or on
+ * a new host (assuming the log is distributed).  We then apply the mutation to a local (in-memory)
+ * data structure for serving fast read requests and then optionally write down the position of the
+ * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
+ * crash. Instead of reading the whole log, we can read all entries past the checkpoint.  This
+ * design implies that all mutations must be idempotent and free from constraint and thus
+ * replayable over newer operations when recovering from an old checkpoint.
+ *
+ * <p>The important detail in our case is the possibility of writing an op to the log, and then
+ * failing to commit locally since we use a local database instead of an in-memory data structure.
+ * If we die after such a failure, then another instance can read and apply the logged op
+ * erroneously.
+ *
+ * <p>This implementation leverages a local transaction to handle this:
+ * <ol>
+ *   <li>start local transaction</li>
+ *   <li>perform op locally (uncommitted!)</li>
+ *   <li>write op to log</li>
+ *   <li>commit local transaction</li>
+ *   <li>*checkpoint</li>
+ * </ol>
+ *
+ * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
+ * fails to apply to the log, it'll throw and abort the local storage transaction as well.
+ */
+public class LogStorage extends ForwardingStore
+    implements NonVolatileStorage, DistributedSnapshotStore {
+
+  /**
+   * A service that can schedule an action to be executed periodically.
+   */
+  @VisibleForTesting
+  interface SchedulingService {
+
+    /**
+     * Schedules an action to execute periodically.
+     *
+     * @param interval The time period to wait until running the {@code action} again.
+     * @param action The action to execute periodically.
+     */
+    void doEvery(Amount<Long, Time> interval, Runnable action);
+  }
+
+  private static class ScheduledExecutorSchedulingService implements SchedulingService {
+    private final ScheduledExecutorService scheduledExecutor;
+
+    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
+        Amount<Long, Time> shutdownGracePeriod) {
+      scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+      shutdownRegistry.addAction(
+          new ExecutorServiceShutdown(scheduledExecutor, shutdownGracePeriod));
+    }
+
+    @Override
+    public void doEvery(Amount<Long, Time> interval, Runnable action) {
+      checkNotNull(interval);
+      checkNotNull(action);
+
+      long delay = interval.getValue();
+      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
+      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
+    }
+  }
+
+  private static final Logger LOG = Logger.getLogger(LogStorage.class.getName());
+
+  private final LogManager logManager;
+  private final SchedulingService schedulingService;
+  private final SnapshotStore<Snapshot> snapshotStore;
+  private final Amount<Long, Time> snapshotInterval;
+
+  private StreamManager streamManager;
+
+  private boolean recovered = false;
+  private StreamTransaction transaction = null;
+
+  private final MutableStoreProvider logStoreProvider = new MutableStoreProvider() {
+    @Override public SchedulerStore.Mutable getSchedulerStore() {
+      return LogStorage.this;
+    }
+
+    @Override public JobStore.Mutable getJobStore() {
+      return LogStorage.this;
+    }
+
+    @Override public TaskStore getTaskStore() {
+      return LogStorage.this;
+    }
+
+    @Override public TaskStore.Mutable getUnsafeTaskStore() {
+      return LogStorage.this;
+    }
+
+    @Override public LockStore.Mutable getLockStore() {
+      return LogStorage.this;
+    }
+
+    @Override public QuotaStore.Mutable getQuotaStore() {
+      return LogStorage.this;
+    }
+
+    @Override public AttributeStore.Mutable getAttributeStore() {
+      return LogStorage.this;
+    }
+  };
+
+  /**
+   * Identifies the grace period to give in-process snapshots and checkpoints to complete during
+   * shutdown.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface ShutdownGracePeriod { }
+
+  /**
+   * Identifies the interval between snapshots of local storage truncating the log.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface SnapshotInterval { }
+
+  /**
+   * Identifies a local storage layer that is written to only after first ensuring the write
+   * operation is persisted in the log.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @BindingAnnotation
+  public @interface WriteBehind { }
+
+  @Inject
+  LogStorage(LogManager logManager,
+             ShutdownRegistry shutdownRegistry,
+             @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
+             SnapshotStore<Snapshot> snapshotStore,
+             @SnapshotInterval Amount<Long, Time> snapshotInterval,
+             @WriteBehind Storage storage,
+             @WriteBehind SchedulerStore.Mutable schedulerStore,
+             @WriteBehind JobStore.Mutable jobStore,
+             @WriteBehind TaskStore.Mutable taskStore,
+             @WriteBehind LockStore.Mutable lockStore,
+             @WriteBehind QuotaStore.Mutable quotaStore,
+             @WriteBehind AttributeStore.Mutable attributeStore) {
+
+    this(logManager,
+        new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
+        snapshotStore,
+        snapshotInterval,
+        storage,
+        schedulerStore,
+        jobStore,
+        taskStore,
+        lockStore,
+        quotaStore,
+        attributeStore);
+  }
+
+  @VisibleForTesting
+  LogStorage(LogManager logManager,
+             SchedulingService schedulingService,
+             SnapshotStore<Snapshot> snapshotStore,
+             Amount<Long, Time> snapshotInterval,
+             Storage storage,
+             SchedulerStore.Mutable schedulerStore,
+             JobStore.Mutable jobStore,
+             TaskStore.Mutable taskStore,
+             LockStore.Mutable lockStore,
+             QuotaStore.Mutable quotaStore,
+             AttributeStore.Mutable attributeStore) {
+
+    super(storage, schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
+    this.logManager = checkNotNull(logManager);
+    this.schedulingService = checkNotNull(schedulingService);
+    this.snapshotStore = checkNotNull(snapshotStore);
+    this.snapshotInterval = checkNotNull(snapshotInterval);
+  }
+
+  @Override
+  public synchronized void prepare() {
+    // Open the log to make a log replica available to the scheduler group.
+    try {
+      streamManager = logManager.open();
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to open the log, cannot continue", e);
+    }
+
+    // TODO(John Sirois): start incremental recovery here from the log and do a final recovery
+    // catchup in start after shutting down the incremental syncer.
+  }
+
+  @Override
+  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        // Must have the underlying storage started so we can query it for the last checkpoint.
+        // We replay these entries in the forwarded storage system's transactions but not ours - we
+        // do not want to re-record these ops to the log.
+        recover();
+        recovered = true;
+
+        // Now that we're recovered we should let any mutations done in initializationLogic append
+        // to the log, so run it in one of our transactions.
+        write(initializationLogic);
+      }
+    });
+
+    scheduleSnapshots();
+  }
+
+  @Override
+  public void stop() {
+    // No-op.
+  }
+
+  @Timed("scheduler_log_recover")
+  void recover() throws RecoveryFailedException {
+    try {
+      streamManager.readFromBeginning(new Closure<LogEntry>() {
+        @Override public void execute(LogEntry logEntry) {
+          replay(logEntry);
+        }
+      });
+    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+      throw new RecoveryFailedException(e);
+    }
+  }
+
+  private static final class RecoveryFailedException extends SchedulerException {
+    private RecoveryFailedException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  void replay(final LogEntry logEntry) {
+    switch (logEntry.getSetField()) {
+      case SNAPSHOT:
+        Snapshot snapshot = logEntry.getSnapshot();
+        LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+        snapshotStore.applySnapshot(snapshot);
+        break;
+
+      case TRANSACTION:
+        for (Op op : logEntry.getTransaction().getOps()) {
+          replayOp(op);
+        }
+        break;
+
+      case NOOP:
+        // Nothing to do here
+        break;
+
+      case DEFLATED_ENTRY:
+        throw new IllegalArgumentException("Deflated entries are not handled at this layer.");
+
+      case FRAME:
+        throw new IllegalArgumentException("Framed entries are not handled at this layer.");
+
+      default:
+        throw new IllegalStateException("Unknown log entry type: " + logEntry);
+    }
+  }
+
+  private void replayOp(Op op) {
+    switch (op.getSetField()) {
+      case SAVE_FRAMEWORK_ID:
+        saveFrameworkId(op.getSaveFrameworkId().getId());
+        break;
+
+      case SAVE_ACCEPTED_JOB:
+        SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
+        saveAcceptedJob(
+            acceptedJob.getManagerId(),
+            IJobConfiguration.build(acceptedJob.getJobConfig()));
+        break;
+
+      case REMOVE_JOB:
+        removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+        break;
+
+      case SAVE_TASKS:
+        saveTasks(IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+        break;
+
+      case REWRITE_TASK:
+        RewriteTask rewriteTask = op.getRewriteTask();
+        unsafeModifyInPlace(rewriteTask.getTaskId(), ITaskConfig.build(rewriteTask.getTask()));
+        break;
+
+      case REMOVE_TASKS:
+        deleteTasks(op.getRemoveTasks().getTaskIds());
+        break;
+
+      case SAVE_QUOTA:
+        SaveQuota saveQuota = op.getSaveQuota();
+        saveQuota(saveQuota.getRole(), IQuota.build(saveQuota.getQuota()));
+        break;
+
+      case REMOVE_QUOTA:
+        removeQuota(op.getRemoveQuota().getRole());
+        break;
+
+      case SAVE_HOST_ATTRIBUTES:
+        saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
+        break;
+
+      case SAVE_LOCK:
+        saveLock(ILock.build(op.getSaveLock().getLock()));
+        break;
+
+      case REMOVE_LOCK:
+        removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
+        break;
+
+      default:
+        throw new IllegalStateException("Unknown transaction op: " + op);
+    }
+  }
+
+  private void scheduleSnapshots() {
+    if (snapshotInterval.getValue() > 0) {
+      schedulingService.doEvery(snapshotInterval, new Runnable() {
+        @Override public void run() {
+          try {
+            snapshot();
+          } catch (StorageException e) {
+            if (e.getCause() != null) {
+              LOG.log(Level.WARNING, e.getMessage(), e.getCause());
+            } else {
+              LOG.log(Level.WARNING, "StorageException when attempting to snapshot.", e);
+            }
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Forces a snapshot of the storage state.
+   *
+   * @throws CodingException If there is a problem encoding the snapshot.
+   * @throws InvalidPositionException If the log stream cursor is invalid.
+   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
+   */
+  @Timed("scheduler_log_snapshot")
+  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
+    super.write(new MutateWork.NoResult<CodingException>() {
+      @Override protected void execute(MutableStoreProvider unused)
+          throws CodingException, InvalidPositionException, StreamAccessException {
+
+        persist(snapshotStore.createSnapshot());
+      }
+    });
+  }
+
+  @Timed("scheduler_log_snapshot_persist")
+  @Override
+  public void persist(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    streamManager.snapshot(snapshot);
+  }
+
+  @Override
+  public synchronized <T, E extends Exception> T write(final MutateWork<T, E> work)
+      throws StorageException, E {
+
+    // We don't want to use the log when recovering from it, we just want to update the underlying
+    // store - so pass mutations straight through to the underlying storage.
+    if (!recovered) {
+      return super.write(work);
+    }
+
+    // The log stream transaction has already been set up so we just need to delegate with our
+    // store provider so any mutations performed by work get logged.
+    if (transaction != null) {
+      return super.write(new MutateWork<T, E>() {
+        @Override public T apply(MutableStoreProvider unused) throws E {
+          return work.apply(logStoreProvider);
+        }
+      });
+    }
+
+    transaction = streamManager.startTransaction();
+    try {
+      return super.write(new MutateWork<T, E>() {
+        @Override public T apply(MutableStoreProvider unused) throws E {
+          T result = work.apply(logStoreProvider);
+          try {
+            transaction.commit();
+          } catch (CodingException e) {
+            throw new IllegalStateException(
+                "Problem encoding transaction operations to the log stream", e);
+          } catch (StreamAccessException e) {
+            throw new StorageException(
+                "There was a problem committing the transaction to the log.", e);
+          }
+          return result;
+        }
+      });
+    } finally {
+      transaction = null;
+    }
+  }
+
+  @Timed("scheduler_log_save_framework_id")
+  @Override
+  public void saveFrameworkId(final String frameworkId) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+        LogStorage.super.saveFrameworkId(frameworkId);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_job_save")
+  @Override
+  public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
+        LogStorage.super.saveAcceptedJob(managerId, jobConfig);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_job_remove")
+  @Override
+  public void removeJob(final IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+        LogStorage.super.removeJob(jobKey);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_tasks_save")
+  @Override
+  public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+        LogStorage.super.saveTasks(newTasks);
+      }
+    });
+  }
+
+  @Override
+  public void deleteAllTasks() {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        Query.Builder query = Query.unscoped();
+        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+            .transform(Tasks.SCHEDULED_TO_ID)
+            .toSet();
+        deleteTasks(ids);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_tasks_remove")
+  @Override
+  public void deleteTasks(final Set<String> taskIds) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.removeTasks(new RemoveTasks(taskIds)));
+        LogStorage.super.deleteTasks(taskIds);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_tasks_mutate")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      final Query.Builder query,
+      final Function<IScheduledTask, IScheduledTask> mutator) {
+
+    return write(new MutateWork.Quiet<ImmutableSet<IScheduledTask>>() {
+      @Override public ImmutableSet<IScheduledTask> apply(MutableStoreProvider unused) {
+        ImmutableSet<IScheduledTask> mutated = LogStorage.super.mutateTasks(query, mutator);
+
+        Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
+        if (LOG.isLoggable(Level.FINE)) {
+          LOG.fine("Storing updated tasks to log: "
+              + Maps.transformValues(tasksById, Tasks.GET_STATUS));
+        }
+
+        // TODO(William Farner): Avoid writing an op when mutated is empty.
+        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
+        return mutated;
+      }
+    });
+  }
+
+  @Timed("scheduler_log_unsafe_modify_in_place")
+  @Override
+  public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
+    return write(new MutateWork.Quiet<Boolean>() {
+      @Override public Boolean apply(MutableStoreProvider storeProvider) {
+        boolean mutated = LogStorage.super.unsafeModifyInPlace(taskId, taskConfiguration);
+        if (mutated) {
+          log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
+        }
+        return mutated;
+      }
+    });
+  }
+
+  @Timed("scheduler_log_quota_remove")
+  @Override
+  public void removeQuota(final String role) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.removeQuota(new RemoveQuota(role)));
+        LogStorage.super.removeQuota(role);
+      }
+    });
+  }
+
+  @Timed("scheduler_log_quota_save")
+  @Override
+  public void saveQuota(final String role, final IQuota quota) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+        LogStorage.super.saveQuota(role, quota);
+      }
+    });
+  }
+
+  @Timed("scheduler_save_host_attribute")
+  @Override
+  public void saveHostAttributes(final HostAttributes attrs) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        // Pass the updated attributes upstream, and then check if the stored value changes.
+        // We do this since different parts of the system write partial HostAttributes objects
+        // and they are merged together internally.
+        // TODO(William Farner): Split out a separate method
+        //                       saveAttributes(String host, Iterable<Attributes>) to simplify this.
+        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(attrs.getHost());
+        LogStorage.super.saveHostAttributes(attrs);
+        Optional<HostAttributes> updated = LogStorage.super.getHostAttributes(attrs.getHost());
+        if (!saved.equals(updated)) {
+          log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
+        }
+      }
+    });
+  }
+
+  @Timed("scheduler_lock_save")
+  @Override
+  public void saveLock(final ILock lock) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.saveLock(new SaveLock(lock.newBuilder())));
+        LogStorage.super.saveLock(lock);
+      }
+    });
+  }
+
+  @Timed("scheduler_lock_remove")
+  @Override
+  public void removeLock(final ILockKey lockKey) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
+        LogStorage.super.removeLock(lockKey);
+      }
+    });
+  }
+
+  @Override
+  public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
+    write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider unused) {
+        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(host);
+        if (saved.isPresent()) {
+          HostAttributes attributes = saved.get().setMode(mode);
+          log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
+          LogStorage.super.saveHostAttributes(attributes);
+        }
+      }
+    });
+    return false;
+  }
+
+  @Override
+  public void snapshot() throws StorageException {
+    try {
+      doSnapshot();
+    } catch (CodingException e) {
+      throw new StorageException("Failed to encode a snapshot", e);
+    } catch (InvalidPositionException e) {
+      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+    } catch (StreamAccessException e) {
+      throw new StorageException("Failed to create a snapshot", e);
+    }
+    super.snapshot();
+  }
+
+  private void log(Op op) {
+    if (recovered) {
+      transaction.add(op);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
new file mode 100644
index 0000000..92568c8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.lang.annotation.Annotation;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.scheduler.log.Log;
+import com.twitter.aurora.scheduler.storage.CallOrderEnforcingStorage;
+import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
+import com.twitter.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
+import com.twitter.aurora.scheduler.storage.log.LogManager.SnapshotSetting;
+import com.twitter.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
+import com.twitter.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+/**
+ * Bindings for scheduler distributed log based storage.
+ * <p/>
+ * Requires bindings for:
+ * <ul>
+ *   <li>{@link Clock}</li>
+ *   <li>{@link ShutdownRegistry}</li>
+ *   <li>The concrete {@link Log} implementation.</li>
+ * </ul>
+ * <p/>
+ */
+public class LogStorageModule extends AbstractModule {
+
+  @CmdLine(name = "dlog_shutdown_grace_period",
+           help = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
+                  + "actions to complete before forcibly shutting down.")
+  private static final Arg<Amount<Long, Time>> SHUTDOWN_GRACE_PERIOD =
+      Arg.create(Amount.of(2L, Time.SECONDS));
+
+  @CmdLine(name = "dlog_snapshot_interval",
+           help = "Specifies the frequency at which snapshots of local storage are taken and "
+                  + "written to the log.")
+  private static final Arg<Amount<Long, Time>> SNAPSHOT_INTERVAL =
+      Arg.create(Amount.of(1L, Time.HOURS));
+
+  @CmdLine(name = "dlog_max_entry_size",
+           help = "Specifies the maximum entry size to append to the log. Larger entries will be "
+                  + "split across entry Frames.")
+  @VisibleForTesting
+  public static final Arg<Amount<Integer, Data>> MAX_LOG_ENTRY_SIZE =
+      Arg.create(Amount.of(512, Data.KB));
+
+  @CmdLine(name = "deflate_snapshots", help = "Whether snapshots should be deflate-compressed.")
+  private static final Arg<Boolean> DEFLATE_SNAPSHOTS = Arg.create(true);
+
+  @Override
+  protected void configure() {
+    requireBinding(Log.class);
+    requireBinding(Clock.class);
+    requireBinding(ShutdownRegistry.class);
+
+    bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
+    bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
+
+    bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
+        .toInstance(MAX_LOG_ENTRY_SIZE.get());
+    bind(LogManager.class).in(Singleton.class);
+    bind(Boolean.class).annotatedWith(SnapshotSetting.class).toInstance(DEFLATE_SNAPSHOTS.get());
+
+    bind(LogStorage.class).in(Singleton.class);
+    install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
+    bind(DistributedSnapshotStore.class).to(LogStorage.class);
+  }
+
+  private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {
+    bind(Key.get(new TypeLiteral<Amount<Long, Time>>() { }, key)).toInstance(value.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
new file mode 100644
index 0000000..df6b899
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.Lock;
+import com.twitter.aurora.gen.storage.QuotaConfiguration;
+import com.twitter.aurora.gen.storage.SchedulerMetadata;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.gen.storage.StoredJob;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Volatile;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.util.BuildInfo;
+import com.twitter.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
+
+/**
+ * Snapshot store implementation that delegates to underlying snapshot stores by
+ * extracting/applying fields in a snapshot thrift struct.
+ */
+public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
+
+  private static final Logger LOG = Logger.getLogger(SnapshotStoreImpl.class.getName());
+
+  private static final SnapshotField ATTRIBUTE_FIELD = new SnapshotField() {
+    @Override public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
+      snapshot.setHostAttributes(storeProvider.getAttributeStore().getHostAttributes());
+    }
+
+    @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+      store.getAttributeStore().deleteHostAttributes();
+
+      if (snapshot.isSetHostAttributes()) {
+        for (HostAttributes attributes : snapshot.getHostAttributes()) {
+          store.getAttributeStore().saveHostAttributes(attributes);
+        }
+      }
+    }
+  };
+
+  private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
+      ATTRIBUTE_FIELD,
+      new SnapshotField() {
+        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setTasks(
+              IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
+        }
+
+        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          store.getUnsafeTaskStore().deleteAllTasks();
+
+          if (snapshot.isSetTasks()) {
+            store.getUnsafeTaskStore().saveTasks(
+                IScheduledTask.setFromBuilders(snapshot.getTasks()));
+          }
+        }
+      },
+      new SnapshotField() {
+        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          ImmutableSet.Builder<StoredJob> jobs = ImmutableSet.builder();
+          for (String managerId : store.getJobStore().fetchManagerIds()) {
+            for (IJobConfiguration config : store.getJobStore().fetchJobs(managerId)) {
+              jobs.add(new StoredJob(managerId, config.newBuilder()));
+            }
+          }
+          snapshot.setJobs(jobs.build());
+        }
+
+        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          store.getJobStore().deleteJobs();
+
+          if (snapshot.isSetJobs()) {
+            for (StoredJob job : snapshot.getJobs()) {
+              store.getJobStore().saveAcceptedJob(
+                  job.getJobManagerId(),
+                  IJobConfiguration.build(job.getJobConfiguration()));
+            }
+          }
+        }
+      },
+      new SnapshotField() {
+        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          Properties props = new BuildInfo().getProperties();
+
+          snapshot.setSchedulerMetadata(
+                new SchedulerMetadata()
+                  .setFrameworkId(store.getSchedulerStore().fetchFrameworkId())
+                  .setRevision(props.getProperty(BuildInfo.Key.GIT_REVISION.value))
+                  .setTag(props.getProperty(BuildInfo.Key.GIT_TAG.value))
+                  .setTimestamp(props.getProperty(BuildInfo.Key.TIMESTAMP.value))
+                  .setUser(props.getProperty(BuildInfo.Key.USER.value))
+                  .setMachine(props.getProperty(BuildInfo.Key.MACHINE.value))
+                  .setVersion(CURRENT_API_VERSION));
+        }
+
+        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          if (snapshot.isSetSchedulerMetadata()) {
+            // No delete necessary here since this is a single value.
+
+            store.getSchedulerStore()
+                .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId());
+          }
+        }
+      },
+      new SnapshotField() {
+        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
+          for (Map.Entry<String, IQuota> entry : store.getQuotaStore().fetchQuotas().entrySet()) {
+            quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
+          }
+
+          snapshot.setQuotaConfigurations(quotas.build());
+        }
+
+        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          store.getQuotaStore().deleteQuotas();
+
+          if (snapshot.isSetQuotaConfigurations()) {
+            for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) {
+              store.getQuotaStore().saveQuota(quota.getRole(), IQuota.build(quota.getQuota()));
+            }
+          }
+        }
+      },
+      new SnapshotField() {
+        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
+        }
+
+        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+          store.getLockStore().deleteLocks();
+
+          if (snapshot.isSetLocks()) {
+            for (Lock lock : snapshot.getLocks()) {
+              store.getLockStore().saveLock(ILock.build(lock));
+            }
+          }
+        }
+      }
+  );
+
+  private final Clock clock;
+  private final Storage storage;
+
+  @Inject
+  public SnapshotStoreImpl(Clock clock, @Volatile Storage storage) {
+    this.clock = checkNotNull(clock);
+    this.storage = checkNotNull(storage);
+  }
+
+  @Timed("snapshot_create")
+  @Override public Snapshot createSnapshot() {
+    return storage.consistentRead(new Work.Quiet<Snapshot>() {
+      @Override public Snapshot apply(StoreProvider storeProvider) {
+        Snapshot snapshot = new Snapshot();
+
+        // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
+        // one of the field closures is mean and tries to apply a timestamp.
+        long timestamp = clock.nowMillis();
+        for (SnapshotField field : SNAPSHOT_FIELDS) {
+          field.saveToSnapshot(storeProvider, snapshot);
+        }
+        snapshot.setTimestamp(timestamp);
+        return snapshot;
+      }
+    });
+  }
+
+  @Timed("snapshot_apply")
+  @Override public void applySnapshot(final Snapshot snapshot) {
+    checkNotNull(snapshot);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        LOG.info("Restoring snapshot.");
+
+        for (SnapshotField field : SNAPSHOT_FIELDS) {
+          field.restoreFromSnapshot(storeProvider, snapshot);
+        }
+      }
+    });
+  }
+
+  private interface SnapshotField {
+    void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
+
+    void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/testing/LogOpMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/testing/LogOpMatcher.java b/src/main/java/org/apache/aurora/scheduler/storage/log/testing/LogOpMatcher.java
new file mode 100644
index 0000000..a4c0126
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/testing/LogOpMatcher.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log.testing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.easymock.IExpectationSetters;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.Op;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.gen.storage.Transaction;
+import com.twitter.aurora.gen.storage.storageConstants;
+import com.twitter.aurora.scheduler.log.Log.Position;
+import com.twitter.aurora.scheduler.log.Log.Stream;
+
+import static org.easymock.EasyMock.expect;
+
+/**
+ * A junit argument matcher that detects same-value {@link LogEntry} objects in a more human
+ * readable way than byte array comparison.
+ */
+public class LogOpMatcher implements IArgumentMatcher {
+  private final LogEntry expected;
+
+  public LogOpMatcher(LogEntry expected) {
+    this.expected = expected;
+  }
+
+  @Override
+  public boolean matches(Object argument) {
+    try {
+      return expected.equals(ThriftBinaryCodec.decodeNonNull(LogEntry.class, (byte[]) argument));
+    } catch (CodingException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append(expected);
+  }
+
+  /**
+   * Creates a stream matcher that will set expectations on the provided {@code stream}.
+   *
+   * @param stream Mocked stream.
+   * @return A stream matcher to set expectations against {@code stream}.
+   */
+  public static StreamMatcher matcherFor(Stream stream) {
+    return new StreamMatcher(stream);
+  }
+
+  public static final class StreamMatcher {
+    private final Stream stream;
+
+    private StreamMatcher(Stream stream) {
+      this.stream = Preconditions.checkNotNull(stream);
+    }
+
+    /**
+     * Sets an expectation for a stream transaction containing the provided {@code ops}.
+     *
+     * @param ops Operations to expect in the transaction.
+     * @return An expectation setter.
+     */
+    public IExpectationSetters<Position> expectTransaction(Op...ops) {
+      LogEntry entry = LogEntry.transaction(
+          new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
+      return expect(stream.append(sameEntry(entry)));
+    }
+
+    /**
+     * Sets an expectation for a snapshot.
+     *
+     * @param snapshot Expected snapshot.
+     * @return An expectation setter.
+     */
+    public IExpectationSetters<Position> expectSnapshot(Snapshot snapshot) {
+      LogEntry entry = LogEntry.snapshot(snapshot);
+      return expect(stream.append(sameEntry(entry)));
+    }
+  }
+
+  /**
+   * Creates a matcher that supports value matching between a serialized {@link LogEntry} byte array
+   * and a log entry object.
+   *
+   * @param entry Entry to match against.
+   * @return {@code null}, return value included for easymock-style embedding.
+   */
+  private static byte[] sameEntry(LogEntry entry) {
+    EasyMock.reportMatcher(new LogOpMatcher(entry));
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/Interner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Interner.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Interner.java
new file mode 100644
index 0000000..bf05caa
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/Interner.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * An interning pool that can be used to retrieve the canonical instances of objects, while
+ * maintaining a reference count to the canonical instances.
+ *
+ * @param <T> The interned object type.
+ * @param <A> The type used for maintaining associations.
+ */
+class Interner<T, A> {
+
+  private final Map<T, InternEntry<A, T>> pool = Maps.newHashMap();
+
+  /**
+   * Retrieves the canonical instance of {@code t} and maintains {@code association} with the
+   * interned value.  If {@code t} was not previously interned, the provided instance is stored.
+   *
+   * @param t The object to intern, or get the previously-interned value for.
+   * @param association A value to associate with {@code t}.
+   * @return The interned value, which may be reference-equivalent to {@code t}.
+   */
+  synchronized T addAssociation(T t, A association) {
+    InternEntry<A, T> entry = pool.get(t);
+    if (entry == null) {
+      entry = new InternEntry<>(t, association);
+      pool.put(t, entry);
+    } else {
+      entry.associations.add(association);
+    }
+    return entry.interned;
+  }
+
+  /**
+   * Removes an association with an interned value, effectively decrementing the reference count.
+   *
+   * @param t The interned value that {@code association} was associated with.
+   * @param association The association to remove.
+   */
+  synchronized void removeAssociation(T t, A association) {
+    InternEntry<A, T> entry = pool.get(t);
+    if (entry != null) {
+      entry.associations.remove(association);
+      if (entry.associations.isEmpty()) {
+        pool.remove(t);
+      }
+    }
+  }
+
+  /**
+   * Removes all interned values and associations.
+   */
+  synchronized void clear() {
+    pool.clear();
+  }
+
+  @VisibleForTesting
+  synchronized boolean isInterned(T t) {
+    return pool.containsKey(t);
+  }
+
+  @VisibleForTesting
+  synchronized Set<A> getAssociations(T t) {
+    return ImmutableSet.copyOf(pool.get(t).associations);
+  }
+
+  private static class InternEntry<A, T> {
+    private final T interned;
+    private final Set<A> associations = Sets.newHashSet();
+
+    InternEntry(T interned, A initialAssociation) {
+      this.interned = interned;
+      associations.add(initialAssociation);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemAttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemAttributeStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemAttributeStore.java
new file mode 100644
index 0000000..6c383c7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemAttributeStore.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.storage.AttributeStore.Mutable;
+
+/**
+ * An in-memory attribute store.
+ */
+class MemAttributeStore implements Mutable {
+  private final ConcurrentMap<String, HostAttributes> hostAttributes = Maps.newConcurrentMap();
+
+  @Override
+  public void deleteHostAttributes() {
+    hostAttributes.clear();
+  }
+
+  @Override
+  public void saveHostAttributes(HostAttributes attributes) {
+    hostAttributes.putIfAbsent(attributes.getHost(), attributes);
+
+    HostAttributes stored = hostAttributes.get(attributes.getHost());
+    if (!stored.isSetMode()) {
+      stored.setMode(attributes.isSetMode() ? attributes.getMode() : MaintenanceMode.NONE);
+    }
+    stored.setAttributes(attributes.isSetAttributes()
+        ? attributes.getAttributes() : ImmutableSet.<Attribute>of());
+  }
+
+  @Override
+  public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
+    HostAttributes stored = hostAttributes.get(host);
+    if (stored != null) {
+      stored.setMode(mode);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Optional<HostAttributes> getHostAttributes(String host) {
+    return Optional.fromNullable(hostAttributes.get(host));
+  }
+
+  @Override
+  public Set<HostAttributes> getHostAttributes() {
+    return ImmutableSet.copyOf(hostAttributes.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java
new file mode 100644
index 0000000..8268e8d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobStore.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An in-memory job store.
+ */
+class MemJobStore implements JobStore.Mutable {
+
+  private final LoadingCache<String, Manager> managers = CacheBuilder.newBuilder()
+      .build(new CacheLoader<String, Manager>() {
+        @Override public Manager load(String key) {
+          return new Manager();
+        }
+      });
+
+  @Override
+  public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
+    checkNotNull(managerId);
+    checkNotNull(jobConfig);
+
+    IJobKey key = JobKeys.assertValid(jobConfig.getKey());
+    managers.getUnchecked(managerId).jobs.put(key, jobConfig);
+  }
+
+  @Override
+  public void removeJob(IJobKey jobKey) {
+    checkNotNull(jobKey);
+
+    for (Manager manager : managers.asMap().values()) {
+      manager.jobs.remove(jobKey);
+    }
+  }
+
+  @Override
+  public void deleteJobs() {
+    managers.invalidateAll();
+  }
+
+  @Override
+  public Iterable<IJobConfiguration> fetchJobs(String managerId) {
+    checkNotNull(managerId);
+
+    @Nullable Manager manager = managers.getIfPresent(managerId);
+    if (manager == null) {
+      return ImmutableSet.of();
+    }
+
+    synchronized (manager.jobs) {
+      return ImmutableSet.copyOf(manager.jobs.values());
+    }
+  }
+
+  @Override
+  public Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey) {
+    checkNotNull(managerId);
+    checkNotNull(jobKey);
+
+    Optional<Manager> manager = Optional.fromNullable(managers.getIfPresent(managerId));
+    if (!manager.isPresent()) {
+      return Optional.absent();
+    } else {
+      return Optional.fromNullable(manager.get().jobs.get(jobKey));
+    }
+  }
+
+  @Override
+  public Set<String> fetchManagerIds() {
+    return ImmutableSet.copyOf(managers.asMap().keySet());
+  }
+
+  private static class Manager {
+    private final Map<IJobKey, IJobConfiguration> jobs = Maps.newConcurrentMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
new file mode 100644
index 0000000..1ced973
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.entities.ILock;
+import com.twitter.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * An in-memory lock store.
+ */
+class MemLockStore implements LockStore.Mutable {
+
+  private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
+
+  @Override
+  public void saveLock(ILock lock) {
+    locks.put(lock.getKey(), lock);
+  }
+
+  @Override
+  public void removeLock(ILockKey lockKey) {
+    locks.remove(lockKey);
+  }
+
+  @Override
+  public void deleteLocks() {
+    locks.clear();
+  }
+
+  @Override
+  public Set<ILock> fetchLocks() {
+    return ImmutableSet.copyOf(locks.values());
+  }
+
+  @Override
+  public Optional<ILock> fetchLock(ILockKey lockKey) {
+    return Optional.fromNullable(locks.get(lockKey));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
new file mode 100644
index 0000000..855b39c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An in-memory quota store.
+ */
+class MemQuotaStore implements QuotaStore.Mutable {
+
+  private final Map<String, IQuota> quotas = Maps.newConcurrentMap();
+
+  @Override
+  public void deleteQuotas() {
+    quotas.clear();
+  }
+
+  @Override
+  public void removeQuota(String role) {
+    checkNotNull(role);
+
+    quotas.remove(role);
+  }
+
+  @Override
+  public void saveQuota(String role, IQuota quota) {
+    checkNotNull(role);
+    checkNotNull(quota);
+
+    quotas.put(role, quota);
+  }
+
+  @Override
+  public Optional<IQuota> fetchQuota(String role) {
+    checkNotNull(role);
+    return Optional.fromNullable(quotas.get(role));
+  }
+
+  @Override
+  public Map<String, IQuota> fetchQuotas() {
+    return ImmutableMap.copyOf(quotas);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
new file mode 100644
index 0000000..19bf4ec
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.Atomics;
+
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+
+/**
+ * An in-memory scheduler store.
+ */
+class MemSchedulerStore implements SchedulerStore.Mutable {
+  private final AtomicReference<String> frameworkId = Atomics.newReference();
+
+  @Override
+  public void saveFrameworkId(String newFrameworkId) {
+    frameworkId.set(newFrameworkId);
+  }
+
+  @Nullable
+  @Override
+  public String fetchFrameworkId() {
+    return frameworkId.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
new file mode 100644
index 0000000..438c023
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.ReadWriteLockManager;
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A storage implementation comprised of individual in-memory store implementations.
+ * <p>
+ * This storage has a global read-write lock, which is used when invoking
+ * {@link #consistentRead(Work)} and {@link #write(MutateWork)}.  However, no locks are used at this
+ * level for {@link #weaklyConsistentRead(Work)}. It is the responsibility of the
+ * individual stores to ensure that read operations are thread-safe (optimally supporting
+ * concurrency).  Store implementations may assume that all methods invoked on {@code Mutable}
+ * store interfaces are protected by the global write lock, and thus invoked serially.
+ */
+public class MemStorage implements Storage {
+  private final AtomicLong readLockWaitNanos = Stats.exportLong("read_lock_wait_nanos");
+  private final AtomicLong writeLockWaitNanos = Stats.exportLong("write_lock_wait_nanos");
+
+  private final MutableStoreProvider storeProvider;
+  private final ReadWriteLockManager lockManager = new ReadWriteLockManager();
+
+  @Inject
+  MemStorage(
+      final SchedulerStore.Mutable schedulerStore,
+      final JobStore.Mutable jobStore,
+      final TaskStore.Mutable taskStore,
+      final LockStore.Mutable lockStore,
+      final QuotaStore.Mutable quotaStore,
+      final AttributeStore.Mutable attributeStore) {
+
+    storeProvider = new MutableStoreProvider() {
+      @Override public SchedulerStore.Mutable getSchedulerStore() {
+        return schedulerStore;
+      }
+
+      @Override public JobStore.Mutable getJobStore() {
+        return jobStore;
+      }
+
+      @Override public TaskStore getTaskStore() {
+        return taskStore;
+      }
+
+      @Override public TaskStore.Mutable getUnsafeTaskStore() {
+        return taskStore;
+      }
+
+      @Override public LockStore.Mutable getLockStore() {
+        return lockStore;
+      }
+
+      @Override public QuotaStore.Mutable getQuotaStore() {
+        return quotaStore;
+      }
+
+      @Override public AttributeStore.Mutable getAttributeStore() {
+        return attributeStore;
+      }
+    };
+  }
+
+  /**
+   * Creates a new empty in-memory storage for use in testing.
+   */
+  @VisibleForTesting
+  public static MemStorage newEmptyStorage() {
+    return new MemStorage(
+        new MemSchedulerStore(),
+        new MemJobStore(),
+        new MemTaskStore(),
+        new MemLockStore(),
+        new MemQuotaStore(),
+        new MemAttributeStore());
+  }
+
+  @Timed("mem_storage_consistent_read_operation")
+  @Override
+  public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+    checkNotNull(work);
+
+    long lockStartNanos = System.nanoTime();
+    boolean topLevelOperation = lockManager.readLock();
+    if (topLevelOperation) {
+      readLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
+    }
+    try {
+      return work.apply(storeProvider);
+    } finally {
+      lockManager.readUnlock();
+    }
+  }
+
+  @Timed("mem_storage_weakly_consistent_read_operation")
+  @Override
+  public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
+      throws StorageException, E {
+
+    return work.apply(storeProvider);
+  }
+
+  @Timed("mem_storage_write_operation")
+  @Override
+  public <T, E extends Exception> T write(MutateWork<T, E> work)
+      throws StorageException, E {
+
+    checkNotNull(work);
+
+    long lockStartNanos = System.nanoTime();
+    boolean topLevelOperation = lockManager.writeLock();
+    if (topLevelOperation) {
+      writeLockWaitNanos.addAndGet(System.nanoTime() - lockStartNanos);
+    }
+    try {
+      return work.apply(storeProvider);
+    } finally {
+      lockManager.writeUnlock();
+    }
+  }
+
+  @Override
+  public void snapshot() {
+    // No-op.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
new file mode 100644
index 0000000..a99aa12
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import javax.inject.Singleton;
+
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.JobStore;
+import com.twitter.aurora.scheduler.storage.LockStore;
+import com.twitter.aurora.scheduler.storage.QuotaStore;
+import com.twitter.aurora.scheduler.storage.SchedulerStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.Volatile;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.common.inject.Bindings.KeyFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Binding module for an in-memory storage system.
+ * <p>
+ * Exposes bindings for storage components:
+ * <ul>
+ *   <li>{@link com.twitter.aurora.scheduler.storage.Storage}</li>
+ *   <li>Keyed with keys provided by the provided{@code keyFactory}:</li>
+ *     <ul>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.SchedulerStore}</li>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.JobStore}</li>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.TaskStore}</li>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.LockStore}</li>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.QuotaStore}</li>
+ *       <li>{@link com.twitter.aurora.scheduler.storage.AttributeStore}</li>
+ *     </ul>
+ * </ul>
+ */
+public final class MemStorageModule extends PrivateModule {
+
+  private final KeyFactory keyFactory;
+
+  public MemStorageModule(KeyFactory keyFactory) {
+    this.keyFactory = checkNotNull(keyFactory);
+  }
+
+  private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+    bind(binding).to(impl);
+    bind(impl).in(Singleton.class);
+    Key<T> key = keyFactory.create(binding);
+    bind(key).to(impl);
+    expose(key);
+  }
+
+  @Override
+  protected void configure() {
+    Key<Storage> storageKey = keyFactory.create(Storage.class);
+    bind(storageKey).to(MemStorage.class);
+    expose(storageKey);
+    Key<Storage> exposedMemStorageKey = Key.get(Storage.class, Volatile.class);
+    bind(exposedMemStorageKey).to(MemStorage.class);
+    expose(exposedMemStorageKey);
+    bind(MemStorage.class).in(Singleton.class);
+
+    bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
+    bindStore(JobStore.Mutable.class, MemJobStore.class);
+    bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+    bindStore(LockStore.Mutable.class, MemLockStore.class);
+    bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
+    bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
new file mode 100644
index 0000000..d02511f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -0,0 +1,304 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.gen.TaskQuery;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.TaskStore;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An in-memory task store.
+ */
+class MemTaskStore implements TaskStore.Mutable {
+
+  private static final Logger LOG = Logger.getLogger(MemTaskStore.class.getName());
+
+  @CmdLine(name = "slow_query_log_threshold",
+      help = "Log all queries that take at least this long to execute.")
+  private static final Arg<Amount<Long, Time>> SLOW_QUERY_LOG_THRESHOLD =
+      Arg.create(Amount.of(25L, Time.MILLISECONDS));
+
+  private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
+
+  private final Map<String, Task> tasks = Maps.newConcurrentMap();
+  private final Multimap<IJobKey, String> tasksByJobKey =
+      Multimaps.synchronizedSetMultimap(HashMultimap.<IJobKey, String>create());
+
+  // An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
+  // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
+  // rather than the task), but we intuit this detail here for performance reasons.
+  private final Interner<TaskConfig, String> configInterner = new Interner<TaskConfig, String>();
+
+  private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id");
+  private final AtomicLong taskQueriesByJob = Stats.exportLong("task_queries_by_job");
+  private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all");
+
+  @Timed("mem_storage_fetch_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
+    checkNotNull(query);
+
+    long start = System.nanoTime();
+    ImmutableSet<IScheduledTask> result = matches(query.get()).toSet();
+    long durationNanos = System.nanoTime() - start;
+    Level level = (durationNanos >= slowQueryThresholdNanos) ? Level.INFO : Level.FINE;
+    if (LOG.isLoggable(level)) {
+      Long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
+      LOG.log(level, "Query took " + time + " ms: " + query.get());
+    }
+
+    return result;
+  }
+
+  private final Function<IScheduledTask, Task> toTask =
+      new Function<IScheduledTask, Task>() {
+        @Override public Task apply(IScheduledTask task) {
+          return new Task(task, configInterner);
+        }
+      };
+
+  @Timed("mem_storage_save_tasks")
+  @Override
+  public void saveTasks(Set<IScheduledTask> newTasks) {
+    checkNotNull(newTasks);
+    Preconditions.checkState(Tasks.ids(newTasks).size() == newTasks.size(),
+        "Proposed new tasks would create task ID collision.");
+
+    Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
+    tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
+    tasksByJobKey.putAll(taskIdsByJobKey(canonicalized));
+  }
+
+  private Multimap<IJobKey, String> taskIdsByJobKey(Iterable<Task> toIndex) {
+    return Multimaps.transformValues(
+        Multimaps.index(toIndex, Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED)),
+        TO_ID);
+  }
+
+  @Timed("mem_storage_delete_all_tasks")
+  @Override
+  public void deleteAllTasks() {
+    tasks.clear();
+    tasksByJobKey.clear();
+    configInterner.clear();
+  }
+
+  @Timed("mem_storage_delete_tasks")
+  @Override
+  public void deleteTasks(Set<String> taskIds) {
+    checkNotNull(taskIds);
+
+    for (String id : taskIds) {
+      Task removed = tasks.remove(id);
+      if (removed != null) {
+        tasksByJobKey.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(removed.task), id);
+        configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(), id);
+      }
+    }
+  }
+
+  @Timed("mem_storage_mutate_tasks")
+  @Override
+  public ImmutableSet<IScheduledTask> mutateTasks(
+      Query.Builder query,
+      Function<IScheduledTask, IScheduledTask> mutator) {
+
+    checkNotNull(query);
+    checkNotNull(mutator);
+
+    ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
+    for (IScheduledTask original : matches(query.get())) {
+      IScheduledTask maybeMutated = mutator.apply(original);
+      if (!original.equals(maybeMutated)) {
+        Preconditions.checkState(
+            Tasks.id(original).equals(Tasks.id(maybeMutated)),
+            "A task's ID may not be mutated.");
+        tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
+        mutated.add(maybeMutated);
+      }
+    }
+
+    return mutated.build();
+  }
+
+  @Timed("mem_storage_unsafe_modify_in_place")
+  @Override
+  public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
+    MorePreconditions.checkNotBlank(taskId);
+    checkNotNull(taskConfiguration);
+
+    Task stored = tasks.get(taskId);
+    if (stored == null) {
+      return false;
+    } else {
+      ScheduledTask updated = stored.task.newBuilder();
+      updated.getAssignedTask().setTask(taskConfiguration.newBuilder());
+      tasks.put(taskId, toTask.apply(IScheduledTask.build(updated)));
+      return true;
+    }
+  }
+
+  private static Predicate<IScheduledTask> queryFilter(final TaskQuery query) {
+    return new Predicate<IScheduledTask>() {
+      @Override public boolean apply(IScheduledTask task) {
+        ITaskConfig config = task.getAssignedTask().getTask();
+        if (query.getOwner() != null) {
+          if (!StringUtils.isBlank(query.getOwner().getRole())) {
+            if (!query.getOwner().getRole().equals(config.getOwner().getRole())) {
+              return false;
+            }
+          }
+          if (!StringUtils.isBlank(query.getOwner().getUser())) {
+            if (!query.getOwner().getUser().equals(config.getOwner().getUser())) {
+              return false;
+            }
+          }
+        }
+        if (query.getEnvironment() != null) {
+          if (!query.getEnvironment().equals(config.getEnvironment())) {
+            return false;
+          }
+        }
+        if (query.getJobName() != null) {
+          if (!query.getJobName().equals(config.getJobName())) {
+            return false;
+          }
+        }
+
+        if (query.getTaskIds() != null) {
+          if (!query.getTaskIds().contains(Tasks.id(task))) {
+            return false;
+          }
+        }
+
+        if (query.getStatusesSize() > 0) {
+          if (!query.getStatuses().contains(task.getStatus())) {
+            return false;
+          }
+        }
+        if (!StringUtils.isEmpty(query.getSlaveHost())) {
+          if (!query.getSlaveHost().equals(task.getAssignedTask().getSlaveHost())) {
+            return false;
+          }
+        }
+        if (query.getInstanceIdsSize() > 0) {
+          if (!query.getInstanceIds().contains(task.getAssignedTask().getInstanceId())) {
+            return false;
+          }
+        }
+
+        return true;
+      }
+    };
+  }
+
+  private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
+    ImmutableList.Builder<Task> matches = ImmutableList.builder();
+    for (String id : taskIds) {
+      Task match = tasks.get(id);
+      if (match != null) {
+        matches.add(match);
+      }
+    }
+    return matches.build();
+  }
+
+  private FluentIterable<IScheduledTask> matches(TaskQuery query) {
+    // Apply the query against the working set.
+    Iterable<Task> from;
+    Optional<IJobKey> jobKey = JobKeys.from(Query.arbitrary(query));
+    if (query.isSetTaskIds()) {
+      taskQueriesById.incrementAndGet();
+      from = fromIdIndex(query.getTaskIds());
+    } else if (jobKey.isPresent()) {
+      taskQueriesByJob.incrementAndGet();
+      Collection<String> taskIds = tasksByJobKey.get(jobKey.get());
+      if (taskIds == null) {
+        from = ImmutableList.of();
+      } else {
+        from = fromIdIndex(taskIds);
+      }
+    } else {
+      taskQueriesAll.incrementAndGet();
+      from = tasks.values();
+    }
+
+    return FluentIterable.from(from).transform(TO_SCHEDULED).filter(queryFilter(query));
+  }
+
+  private static final Function<Task, IScheduledTask> TO_SCHEDULED =
+      new Function<Task, IScheduledTask>() {
+        @Override public IScheduledTask apply(Task task) {
+          return task.task;
+        }
+      };
+
+  private static final Function<Task, String> TO_ID =
+      Functions.compose(Tasks.SCHEDULED_TO_ID, TO_SCHEDULED);
+
+  private static class Task {
+    private final IScheduledTask task;
+
+    Task(IScheduledTask task, Interner<TaskConfig, String> interner) {
+      interner.removeAssociation(task.getAssignedTask().getTask().newBuilder(), Tasks.id(task));
+      TaskConfig canonical = interner.addAssociation(
+          task.getAssignedTask().getTask().newBuilder(),
+          Tasks.id(task));
+      ScheduledTask builder = task.newBuilder();
+      builder.getAssignedTask().setTask(canonical);
+      this.task = IScheduledTask.build(builder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
new file mode 100644
index 0000000..cf1b057
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.mem;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Utility class for common operations amongst in-memory store implementations.
+ */
+final class Util {
+
+  private Util() {
+    // Utility class.
+  }
+
+  /**
+   * Creates a function that performs a 'deep copy' on a thrift struct of a specific type.  The
+   * resulting copied objects will be exactly identical to the original.  Mutations to the original
+   * object will not be reflected in the copy, and vice versa.
+   *
+   * @return A copier for the provided type of thrift structs.
+   */
+  static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
+    return new Function<T, T>() {
+      @Override public T apply(@Nullable T input) {
+        if (input == null) {
+          return null;
+        }
+
+        @SuppressWarnings("unchecked")
+        T t = (T) input.deepCopy();
+        return t;
+      }
+    };
+  }
+}


Mime
View raw message