aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/2] aurora git commit: Add a storage recovery tool
Date Fri, 15 Dec 2017 20:51:11 GMT
Add a storage recovery tool

This tool was originally intended as a migration path between Persistence
backends.  As it turns out, the model also works well for recovering from a
backup.

I propose we drop our current recovery mechanism to use this tool.  The existing
recovery-via-scheduler-rpc is slightly non-sensical, as it assumes a healthy
scheduler.  When an operator decides it is necessary to recover from a backup,
we should assume the scheduler state may be broken.  Furthermore, starting an
empty scheduler to bootstrap can have undesirable effects such as advertising
false state to clients and establishing a new empty framework with the master.

Testing Done:
end-to-end tests pass (and exercise recovery tool)

Reviewed at https://reviews.apache.org/r/64625/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2e1ca428
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2e1ca428
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2e1ca428

Branch: refs/heads/master
Commit: 2e1ca42887bc8ea1e8c6cddebe9d1cf29268c714
Parents: 6fd765b
Author: Bill Farner <wfarner@apache.org>
Authored: Fri Dec 15 12:07:37 2017 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Fri Dec 15 12:07:37 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  13 +
 config/checkstyle/suppressions.xml              |   2 +
 .../aurora/benchmark/SnapshotBenchmarks.java    |  10 +-
 .../aurora/scheduler/app/SchedulerMain.java     |  12 +-
 .../aurora/scheduler/config/CliOptions.java     |   6 +-
 .../discovery/ServiceDiscoveryBindings.java     |   2 +-
 .../scheduler/storage/backup/BackupReader.java  |  56 ++++
 .../scheduler/storage/backup/Recovery.java      |  35 +-
 .../storage/backup/TemporaryStorage.java        |   4 +-
 .../durability/DurableStorageModule.java        |  35 ++
 .../scheduler/storage/durability/Recovery.java  | 119 +++++++
 .../storage/durability/RecoveryTool.java        | 196 +++++++++++
 .../storage/log/LogPersistenceModule.java       |  78 +++++
 .../scheduler/storage/log/LogStorageModule.java | 110 ------
 .../scheduler/storage/log/SnapshotModule.java   |  54 +++
 .../storage/log/SnapshotStoreImpl.java          | 332 -------------------
 .../scheduler/storage/log/SnapshotterImpl.java  | 332 +++++++++++++++++++
 .../aurora/scheduler/app/SchedulerIT.java       |  12 +-
 .../scheduler/config/CommandLineTest.java       |   4 +-
 .../storage/durability/RecoveryTest.java        | 110 ++++++
 .../storage/log/LogPersistenceTest.java         |   6 +-
 .../storage/log/NonVolatileStorageTest.java     |  12 +-
 .../storage/log/SnapshotServiceTest.java        |   7 +-
 .../storage/log/SnapshotStoreImplIT.java        | 189 -----------
 .../storage/log/SnapshotterImplIT.java          | 189 +++++++++++
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |  37 +++
 26 files changed, 1284 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4674513..64af7ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -623,3 +623,16 @@ startScripts {
     unixScript.text = unixScript.text.replace('CLASSPATH=', "CLASSPATH=${environmentClasspathPrefix}:")
   }
 }
+
+// Include a script to run the recovery tool.
+task moreStartScripts(type: CreateStartScripts) {
+  mainClassName = 'org.apache.aurora.scheduler.storage.durability.RecoveryTool'
+  applicationName = 'recovery-tool'
+  outputDir = new File(project.buildDir, 'scripts')
+  classpath = jar.outputs.files + project.configurations.runtime
+}
+
+applicationDistribution.into('bin') {
+  from(moreStartScripts)
+  fileMode = 0755
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/config/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
index c4081b9..03f57c8 100644
--- a/config/checkstyle/suppressions.xml
+++ b/config/checkstyle/suppressions.xml
@@ -21,6 +21,8 @@ limitations under the License.
   <!-- Allow use of System.exit() in main. -->
   <suppress files="org/apache/aurora/scheduler/config/CommandLine.java"
             checks="RegexpSinglelineJava"/>
+  <suppress files="org/apache/aurora/scheduler/storage/durability/RecoveryTool.java"
+            checks="RegexpSinglelineJava"/>
   <suppress files="org/apache/aurora/scheduler/storage/db/migration/.*" checks="TypeName" />
   <suppress files="org/apache/aurora/scheduler/storage/db/testmigration/.*" checks="TypeName" />
 </suppressions>

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index 4f99f80..e3ed3f2 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -27,7 +27,7 @@ import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.thrift.TException;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -56,7 +56,7 @@ public class SnapshotBenchmarks {
   @Threads(1)
   @State(Scope.Thread)
   public static class RestoreSnapshotWithUpdatesBenchmark {
-    private SnapshotStoreImpl snapshotStore;
+    private SnapshotterImpl snapshotStore;
     private Snapshot snapshot;
     private Storage storage;
 
@@ -80,21 +80,21 @@ public class SnapshotBenchmarks {
       return System.currentTimeMillis() % 5 == 0;
     }
 
-    private SnapshotStoreImpl getSnapshotStore() {
+    private SnapshotterImpl getSnapshotStore() {
       Injector injector = Guice.createInjector(
           new AbstractModule() {
             @Override
             protected void configure() {
               bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
               bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-              bind(SnapshotStoreImpl.class).in(Singleton.class);
+              bind(SnapshotterImpl.class).in(Singleton.class);
             }
           },
           new MemStorageModule());
 
       storage = injector.getInstance(Key.get(Storage.class, Storage.Volatile.class));
       storage.prepare();
-      return injector.getInstance(SnapshotStoreImpl.class);
+      return injector.getInstance(SnapshotterImpl.class);
     }
 
     private Snapshot createSnapshot(int updates, int events, int instanceEvents) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 2bf7e7b..3ce9bc2 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -59,9 +59,11 @@ import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
 import org.apache.aurora.scheduler.stats.StatsModule;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -210,7 +212,7 @@ public class SchedulerMain {
         new ServiceDiscoveryModule(
             FlaggedZooKeeperConfig.create(options.zk),
             options.main.serversetPath),
-        new BackupModule(options.backup, SnapshotStoreImpl.class),
+        new BackupModule(options.backup, SnapshotterImpl.class),
         new ExecutorModule(options.executor),
         new AbstractModule() {
           @Override
@@ -249,8 +251,10 @@ public class SchedulerMain {
         .add(
             new CommandLineDriverSettingsModule(options.driver, options.main.allowGpuResource),
             new LibMesosLoadingModule(options.main.driverImpl),
+            new DurableStorageModule(),
             new MesosLogStreamModule(options.mesosLog, FlaggedZooKeeperConfig.create(options.zk)),
-            new LogStorageModule(options.logStorage),
+            new LogPersistenceModule(options.logPersistence),
+            new SnapshotModule(options.snapshot),
             new TierModule(options.tiers),
             new WebhookModule(options.webhook)
         )

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index b7f43e0..e4e5358 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
 import org.apache.aurora.scheduler.stats.StatsModule;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
 import org.apache.aurora.scheduler.thrift.aop.AopModule;
 import org.apache.aurora.scheduler.updater.UpdaterModule;
 
@@ -64,7 +65,8 @@ public class CliOptions {
   public final FlaggedZooKeeperConfig.Options zk = new FlaggedZooKeeperConfig.Options();
   public final UpdaterModule.Options updater = new UpdaterModule.Options();
   public final StateModule.Options state = new StateModule.Options();
-  public final LogStorageModule.Options logStorage = new LogStorageModule.Options();
+  public final LogPersistenceModule.Options logPersistence = new LogPersistenceModule.Options();
+  public final SnapshotModule.Options snapshot = new SnapshotModule.Options();
   public final BackupModule.Options backup = new BackupModule.Options();
   public final AopModule.Options aop = new AopModule.Options();
   public final PruningModule.Options pruning = new PruningModule.Options();

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
index b574c13..a57a77c 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryBindings.java
@@ -44,7 +44,7 @@ public final class ServiceDiscoveryBindings {
   /**
    * A binding key for the ZooKeeper cluster endpoints.
    */
-  static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY =
+  public static final Key<Iterable<InetSocketAddress>> ZOO_KEEPER_CLUSTER_KEY =
       Key.get(new TypeLiteral<Iterable<InetSocketAddress>>() { }, ZooKeeper.class);
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
new file mode 100644
index 0000000..82d712c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupReader.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A persistence implementation to be used as a migration source.
+ */
+public class BackupReader implements Persistence {
+
+  private final File backupFile;
+  private final Snapshotter snapshotter;
+
+  public BackupReader(File backupFile, Snapshotter snapshotter) {
+    this.backupFile = requireNonNull(backupFile);
+    this.snapshotter = requireNonNull(snapshotter);
+  }
+
+  @Override
+  public Stream<Edit> recover() throws PersistenceException {
+    if (!backupFile.exists()) {
+      throw new PersistenceException("Backup " + backupFile + " does not exist.");
+    }
+
+    return snapshotter.asStream(Recovery.load(backupFile)).map(Edit::op);
+  }
+
+  @Override
+  public void prepare() {
+    // no-op
+  }
+
+  @Override
+  public void persist(Stream<Op> records) {
+    throw new UnsupportedOperationException("Backups are read-only");
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 79899a0..3c2ea50 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -136,22 +136,7 @@ public interface Recovery {
 
     @Override
     public void stage(String backupName) throws RecoveryException {
-      File backupFile = new File(backupDir, backupName);
-      if (!backupFile.exists()) {
-        throw new RecoveryException("Backup " + backupName + " does not exist.");
-      }
-
-      Snapshot snapshot = new Snapshot();
-      try {
-        TBinaryProtocol prot = new TBinaryProtocol(
-            new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile))));
-
-        snapshot.read(prot);
-      } catch (TException e) {
-        throw new RecoveryException("Failed to decode backup " + e, e);
-      } catch (IOException e) {
-        throw new RecoveryException("Failed to read backup " + e, e);
-      }
+      Snapshot snapshot = load(new File(backupDir, backupName));
       boolean applied =
           recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot)));
       if (!applied) {
@@ -214,4 +199,22 @@ public interface Recovery {
       }
     }
   }
+
+  static Snapshot load(File backupFile) throws RecoveryException {
+    if (!backupFile.exists()) {
+      throw new RecoveryException("Backup " + backupFile + " does not exist.");
+    }
+
+    try {
+      Snapshot snapshot = new Snapshot();
+      TBinaryProtocol prot = new TBinaryProtocol(
+          new TIOStreamTransport(new BufferedInputStream(new FileInputStream(backupFile))));
+      snapshot.read(prot);
+      return snapshot;
+    } catch (TException e) {
+      throw new RecoveryException("Failed to decode backup " + e, e);
+    } catch (IOException e) {
+      throw new RecoveryException("Failed to read backup " + e, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 0305d9d..5641738 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -31,7 +31,7 @@ import org.apache.aurora.scheduler.storage.durability.Loader;
 import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
 import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 
 import static java.util.Objects.requireNonNull;
@@ -84,7 +84,7 @@ interface TemporaryStorage {
       BuildInfo buildInfo = generateBuildInfo();
       FakeClock clock = new FakeClock();
       clock.setNowMillis(snapshot.getTimestamp());
-      Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock);
+      Snapshotter snapshotter = new SnapshotterImpl(buildInfo, clock);
 
       storage.write((NoResult.Quiet) stores -> {
         Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op));

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
new file mode 100644
index 0000000..6bb134a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorageModule.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import javax.inject.Singleton;
+
+import com.google.inject.PrivateModule;
+
+import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+
+/**
+ * Binding module for a durable storage layer.
+ */
+public class DurableStorageModule extends PrivateModule {
+  @Override
+  protected void configure() {
+    install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+    bind(DurableStorage.class).in(Singleton.class);
+    expose(Storage.class);
+    expose(NonVolatileStorage.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
new file mode 100644
index 0000000..819d70e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Recovery.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to clone a persistence.
+ */
+final class Recovery {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Recovery.class);
+
+  private Recovery() {
+    // utility class.
+  }
+
+  /**
+   * Copies all state from one persistence to another, batching into calls to
+   * {@link Persistence#persist(Stream)}.
+   *
+   * @param from Source.
+   * @param to Destination.
+   * @param batchSize Maximum number of entries to include in any given persist.
+   */
+  static void copy(Persistence from, Persistence to, int batchSize) {
+    requireEmpty(to);
+
+    long start = System.nanoTime();
+    AtomicLong count = new AtomicLong();
+    AtomicInteger batchNumber = new AtomicInteger();
+    List<Op> batch = Lists.newArrayListWithExpectedSize(batchSize);
+    Runnable saveBatch = () -> {
+      LOG.info("Saving batch " + batchNumber.incrementAndGet());
+      try {
+        to.persist(batch.stream());
+      } catch (PersistenceException e) {
+        throw new RuntimeException(e);
+      }
+      batch.clear();
+    };
+
+    AtomicBoolean dataBegin = new AtomicBoolean(false);
+    try {
+      from.recover()
+          .filter(edit -> {
+            if (edit.isDeleteAll()) {
+              // Suppress any storage reset instructions.
+              // Persistence implementations may recover with these, but do not support persisting
+              // them.  As a result, we require that the recovery source produces a reset
+              // instruction at the beginning of the stream, if at all.
+
+              if (dataBegin.get()) {
+                throw new IllegalStateException(
+                    "A storage reset instruction arrived after the beginning of data");
+              }
+              return false;
+            } else {
+              dataBegin.set(true);
+            }
+            return true;
+          })
+          .forEach(edit -> {
+            count.incrementAndGet();
+            batch.add(edit.getOp());
+            if (batch.size() == batchSize) {
+              saveBatch.run();
+              LOG.info("Fetching batch");
+            }
+          });
+    } catch (PersistenceException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (!batch.isEmpty()) {
+      saveBatch.run();
+    }
+    long end = System.nanoTime();
+    LOG.info("Recovery finished");
+    LOG.info("Copied " + count.get() + " ops in "
+        + Amount.of(end - start, Time.NANOSECONDS).as(Time.MILLISECONDS) + " ms");
+  }
+
+  private static void requireEmpty(Persistence persistence) {
+    LOG.info("Ensuring recovery destination is empty");
+    try (Stream<Edit> edits = persistence.recover()) {
+      if (edits.findFirst().isPresent()) {
+        throw new IllegalStateException("Refusing to recover into non-empty persistence");
+      }
+    } catch (PersistenceException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
new file mode 100644
index 0000000..7cb4c52
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/RecoveryTool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.IStringConverterFactory;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.converters.DataAmountConverter;
+import org.apache.aurora.scheduler.config.converters.InetSocketAddressConverter;
+import org.apache.aurora.scheduler.config.converters.TimeAmountConverter;
+import org.apache.aurora.scheduler.config.types.DataAmount;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
+import org.apache.aurora.scheduler.discovery.ServiceDiscoveryBindings;
+import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.backup.BackupReader;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility to recover the contents of one persistence into another.
+ */
+public final class RecoveryTool {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RecoveryTool.class);
+
+  private RecoveryTool() {
+    // Main-only class.
+  }
+
+  interface RecoveryEndpoint {
+    Iterable<Object> getOptions();
+
+    Persistence create();
+  }
+
+  private static class Log implements RecoveryEndpoint {
+    private final FlaggedZooKeeperConfig.Options zkOptions = new FlaggedZooKeeperConfig.Options();
+    private final MesosLogStreamModule.Options logOptions = new MesosLogStreamModule.Options();
+    private final LogPersistenceModule.Options options = new LogPersistenceModule.Options();
+
+    @Override
+    public Iterable<Object> getOptions() {
+      return ImmutableList.of(logOptions, options, zkOptions);
+    }
+
+    @Override
+    public Persistence create() {
+      Injector injector = Guice.createInjector(
+          new TierModule(TaskTestUtil.TIER_CONFIG),
+          new MesosLogStreamModule(logOptions, FlaggedZooKeeperConfig.create(zkOptions)),
+          new LogPersistenceModule(options),
+          new LifecycleModule(),
+          new AbstractModule() {
+            @Override
+            protected void configure() {
+              bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+                  .toInstance(zkOptions.zkEndpoints);
+              bind(Snapshotter.class).to(SnapshotterImpl.class);
+              bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+              bind(BuildInfo.class).toInstance(new BuildInfo());
+            }
+          });
+      return injector.getInstance(Persistence.class);
+    }
+  }
+
+  private static class Backup implements RecoveryEndpoint {
+    @Parameters(separators = "=")
+    private static class Options {
+      @Parameter(names = "-backup", description = "Backup file to load")
+      File backup;
+    }
+
+    private final Options options = new Options();
+
+    @Override
+    public Iterable<Object> getOptions() {
+      return ImmutableList.of(options);
+    }
+
+    @Override
+    public Persistence create() {
+      return new BackupReader(
+          options.backup,
+          new SnapshotterImpl(new BuildInfo(), Clock.SYSTEM_CLOCK));
+    }
+  }
+
+  enum Endpoint {
+    LOG(new Log()),
+    BACKUP(new Backup());
+
+    private final RecoveryEndpoint impl;
+
+    Endpoint(RecoveryEndpoint impl) {
+      this.impl = impl;
+    }
+  }
+
+  @Parameters(separators = "=")
+  private static class Options {
+    @Parameter(names = "-from",
+        required = true,
+        description = "Persistence to read state from")
+    Endpoint from;
+
+    @Parameter(names = "-to",
+        required = true,
+        description = "Persistence to write recovered state into")
+    Endpoint to;
+
+    @Parameter(names = "-batch-size",
+        description = "Write in batches of this may ops.")
+    int batchSize = 50;
+
+    @Parameter(names = "--help", description = "Print usage", help = true)
+    boolean help;
+  }
+
+  private static JCommander configure(Options options, String... args) {
+    JCommander.Builder builder = JCommander.newBuilder().programName(RecoveryTool.class.getName());
+    builder.addConverterFactory(new IStringConverterFactory() {
+      private Map<Class<?>, Class<? extends IStringConverter<?>>> classConverters =
+          ImmutableMap.<Class<?>, Class<? extends IStringConverter<?>>>builder()
+              .put(DataAmount.class, DataAmountConverter.class)
+              .put(InetSocketAddress.class, InetSocketAddressConverter.class)
+              .put(TimeAmount.class, TimeAmountConverter.class)
+              .build();
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public <T> Class<? extends IStringConverter<T>> getConverter(Class<T> forType) {
+        return (Class<IStringConverter<T>>) classConverters.get(forType);
+      }
+    });
+
+    builder.addObject(options);
+    for (Endpoint endpoint : Endpoint.values()) {
+      endpoint.impl.getOptions().forEach(builder::addObject);
+    }
+
+    JCommander parser = builder.build();
+    parser.parse(args);
+    return parser;
+  }
+
+  public static void main(String[] args) {
+    Options options = new Options();
+    JCommander parser = configure(options, args);
+    if (options.help) {
+      parser.usage();
+      System.exit(1);
+    }
+
+    LOG.info("Recovering from " + options.from + " to " + options.to);
+    Persistence from = options.from.impl.create();
+    Persistence to = options.to.impl.create();
+
+    from.prepare();
+    to.prepare();
+
+    Recovery.copy(from, to, options.batchSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
new file mode 100644
index 0000000..ffe3cbf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistenceModule.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.scheduler.config.types.DataAmount;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
+import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
+import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+
+/**
+ * Bindings for scheduler distributed log based persistence.
+ */
+public class LogPersistenceModule extends PrivateModule {
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-dlog_max_entry_size",
+        description =
+            "Specifies the maximum entry size to append to the log. Larger entries will be "
+                + "split across entry Frames.")
+    public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB);
+  }
+
+  private final Options options;
+
+  public LogPersistenceModule(Options options) {
+    this.options = options;
+  }
+
+  @Override
+  protected void configure() {
+    bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
+        .toInstance(options.maxLogEntrySize);
+    bind(LogManager.class).in(Singleton.class);
+    bind(LogPersistence.class).in(Singleton.class);
+    bind(Persistence.class).to(LogPersistence.class);
+    expose(Persistence.class);
+    expose(LogPersistence.class);
+
+    bind(EntrySerializer.class).to(EntrySerializerImpl.class);
+    // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
+    // versus a faster error-detection checksum like CRC32 for large Snapshots.
+    @SuppressWarnings("deprecation")
+    HashFunction hashFunction = Hashing.md5();
+    bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
+
+    bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
+
+    install(new FactoryModuleBuilder()
+        .implement(StreamManager.class, StreamManagerImpl.class)
+        .build(StreamManagerFactory.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
deleted file mode 100644
index 671593c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.config.types.DataAmount;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage;
-import org.apache.aurora.scheduler.storage.durability.Persistence;
-import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
-import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
-import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
-import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
-
-/**
- * Bindings for scheduler distributed log based storage.
- */
-public class LogStorageModule extends AbstractModule {
-
-  @Parameters(separators = "=")
-  public static class Options {
-    @Parameter(names = "-dlog_snapshot_interval",
-        description = "Specifies the frequency at which snapshots of local storage are taken and "
-            + "written to the log.")
-    public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS);
-
-    @Parameter(names = "-dlog_max_entry_size",
-        description =
-            "Specifies the maximum entry size to append to the log. Larger entries will be "
-                + "split across entry Frames.")
-    public DataAmount maxLogEntrySize = new DataAmount(512, Data.KB);
-  }
-
-  private final Options options;
-
-  public LogStorageModule(Options options) {
-    this.options = options;
-  }
-
-  @Override
-  protected void configure() {
-    install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
-
-        bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
-            .toInstance(options.maxLogEntrySize);
-        bind(LogManager.class).in(Singleton.class);
-        bind(DurableStorage.class).in(Singleton.class);
-
-        install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
-        bind(LogPersistence.class).in(Singleton.class);
-        bind(Persistence.class).to(LogPersistence.class);
-        bind(SnapshotStore.class).to(SnapshotService.class);
-        bind(SnapshotService.class).in(Singleton.class);
-        expose(SnapshotService.class);
-        expose(Persistence.class);
-        expose(Storage.class);
-        expose(NonVolatileStorage.class);
-        expose(SnapshotStore.class);
-
-        bind(EntrySerializer.class).to(EntrySerializerImpl.class);
-        // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
-        // versus a faster error-detection checksum like CRC32 for large Snapshots.
-        @SuppressWarnings("deprecation")
-        HashFunction hashFunction = Hashing.md5();
-        bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
-
-        bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
-
-        install(new FactoryModuleBuilder()
-            .implement(StreamManager.class, StreamManagerImpl.class)
-            .build(StreamManagerFactory.class));
-      }
-    });
-
-    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
new file mode 100644
index 0000000..8c0fc12
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotModule.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.inject.AbstractModule;
+
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
+
+/**
+ * Binding for a snapshot store and period snapshotting service.
+ */
+public class SnapshotModule extends AbstractModule {
+
+  @Parameters(separators = "=")
+  public static class Options {
+    @Parameter(names = "-dlog_snapshot_interval",
+        description = "Specifies the frequency at which snapshots of local storage are taken and "
+            + "written to the log.")
+    public TimeAmount snapshotInterval = new TimeAmount(1, Time.HOURS);
+  }
+
+  private final Options options;
+
+  public SnapshotModule(Options options) {
+    this.options = options;
+  }
+
+  @Override
+  protected void configure() {
+    bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
+    bind(SnapshotStore.class).to(SnapshotService.class);
+    bind(SnapshotService.class).in(Singleton.class);
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
deleted file mode 100644
index 50553f8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Streams;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.common.stats.SlidingStats.Timeable;
-import org.apache.aurora.common.util.BuildInfo;
-import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.QuotaConfiguration;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.SchedulerMetadata;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.Snapshotter;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Snapshot store implementation that delegates to underlying snapshot stores by
- * extracting/applying fields in a snapshot thrift struct.
- */
-public class SnapshotStoreImpl implements Snapshotter {
-
-  @VisibleForTesting
-  static final String SNAPSHOT_SAVE = "snapshot_save_";
-  @VisibleForTesting
-  static final String SNAPSHOT_RESTORE = "snapshot_restore_";
-
-  private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
-
-  private static final String HOST_ATTRIBUTES_FIELD = "hosts";
-  private static final String QUOTA_FIELD = "quota";
-  private static final String TASK_FIELD = "tasks";
-  private static final String CRON_FIELD = "crons";
-  private static final String JOB_UPDATE_FIELD = "job_updates";
-  private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata";
-
-  @VisibleForTesting
-  Set<String> snapshotFieldNames() {
-    return snapshotFields.stream()
-        .map(SnapshotField::getName)
-        .collect(Collectors.toSet());
-  }
-
-  private final List<SnapshotField> snapshotFields = ImmutableList.of(
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return HOST_ATTRIBUTES_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setHostAttributes(
-              IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.getHostAttributesSize() > 0) {
-            return snapshot.getHostAttributes().stream()
-                .map(attributes -> Op.saveHostAttributes(
-                    new SaveHostAttributes().setHostAttributes(attributes)));
-          }
-          return Stream.empty();
-        }
-      },
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return TASK_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setTasks(
-              IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.getTasksSize() > 0) {
-            return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
-          }
-          return Stream.empty();
-        }
-      },
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return CRON_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
-
-          for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
-            jobs.add(new StoredCronJob(config.newBuilder()));
-          }
-          snapshot.setCronJobs(jobs.build());
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.getCronJobsSize() > 0) {
-            return snapshot.getCronJobs().stream()
-                .map(job -> Op.saveCronJob(
-                    new SaveCronJob().setJobConfig(job.getJobConfiguration())));
-          }
-          return Stream.empty();
-        }
-      },
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return SCHEDULER_METADATA_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          // SchedulerMetadata is updated outside of the static list of SnapshotFields
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.isSetSchedulerMetadata()
-              && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
-            // No delete necessary here since this is a single value.
-
-            return Stream.of(Op.saveFrameworkId(
-                new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
-          }
-          return Stream.empty();
-        }
-      },
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return QUOTA_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
-          for (Map.Entry<String, IResourceAggregate> entry
-              : store.getQuotaStore().fetchQuotas().entrySet()) {
-
-            quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
-          }
-
-          snapshot.setQuotaConfigurations(quotas.build());
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.getQuotaConfigurationsSize() > 0) {
-            return snapshot.getQuotaConfigurations().stream()
-                .map(quota -> Op.saveQuota(new SaveQuota()
-                    .setRole(quota.getRole())
-                    .setQuota(quota.getQuota())));
-          }
-          return Stream.empty();
-        }
-      },
-      new SnapshotField() {
-        @Override
-        String getName() {
-          return JOB_UPDATE_FIELD;
-        }
-
-        @Override
-        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setJobUpdateDetails(
-              store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
-                  .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
-                  .collect(Collectors.toSet()));
-        }
-
-        @Override
-        Stream<Op> doStreamFrom(Snapshot snapshot) {
-          if (snapshot.getJobUpdateDetailsSize() > 0) {
-            return snapshot.getJobUpdateDetails().stream()
-                .flatMap(details -> {
-                  Stream<Op> parent = Stream.of(Op.saveJobUpdate(
-                      new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
-                  Stream<Op> jobEvents;
-                  if (details.getDetails().getUpdateEventsSize() > 0) {
-                    jobEvents = details.getDetails().getUpdateEvents().stream()
-                        .map(event -> Op.saveJobUpdateEvent(
-                            new SaveJobUpdateEvent()
-                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
-                                .setEvent(event)));
-                  } else {
-                    jobEvents = Stream.empty();
-                  }
-
-                  Stream<Op> instanceEvents;
-                  if (details.getDetails().getInstanceEventsSize() > 0) {
-                    instanceEvents = details.getDetails().getInstanceEvents().stream()
-                        .map(event -> Op.saveJobInstanceUpdateEvent(
-                            new SaveJobInstanceUpdateEvent()
-                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
-                                .setEvent(event)));
-                  } else {
-                    instanceEvents = Stream.empty();
-                  }
-
-                  return Streams.concat(parent, jobEvents, instanceEvents);
-                });
-          }
-          return Stream.empty();
-        }
-      }
-  );
-
-  private final BuildInfo buildInfo;
-  private final Clock clock;
-
-  @Inject
-  public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) {
-    this.buildInfo = requireNonNull(buildInfo);
-    this.clock = requireNonNull(clock);
-  }
-
-  private Snapshot createSnapshot(StoreProvider storeProvider) {
-    Snapshot snapshot = new Snapshot();
-
-    // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
-    // one of the field closures is mean and tries to apply a timestamp.
-    long timestamp = clock.nowMillis();
-    for (SnapshotField field : snapshotFields) {
-      field.save(storeProvider, snapshot);
-    }
-
-    SchedulerMetadata metadata = new SchedulerMetadata()
-        .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null))
-        .setDetails(buildInfo.getProperties());
-
-    snapshot.setSchedulerMetadata(metadata);
-    snapshot.setTimestamp(timestamp);
-    return snapshot;
-  }
-
-  @Timed("snapshot_create")
-  @Override
-  public Snapshot from(StoreProvider stores) {
-    return createSnapshot(stores);
-  }
-
-  @Timed("snapshot_apply")
-  @Override
-  public Stream<Op> asStream(Snapshot snapshot) {
-    requireNonNull(snapshot);
-
-    LOG.info("Restoring snapshot.");
-    return snapshotFields.stream()
-        .flatMap(field -> field.streamFrom(snapshot));
-  }
-
-  abstract class SnapshotField {
-
-    abstract String getName();
-
-    abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
-
-    abstract Stream<Op> doStreamFrom(Snapshot snapshot);
-
-    void save(StoreProvider storeProvider, Snapshot snapshot) {
-      stats.getUnchecked(SNAPSHOT_SAVE + getName())
-          .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
-    }
-
-    Stream<Op> streamFrom(Snapshot snapshot) {
-      return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
-    }
-  }
-
-  private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
-      new CacheLoader<String, SlidingStats>() {
-        @Override
-        public SlidingStats load(String name) throws Exception {
-          return new SlidingStats(name, "nanos");
-        }
-      });
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
new file mode 100644
index 0000000..4b52be0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotterImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Streams;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.SlidingStats.Timeable;
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.StoredCronJob;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Snapshot store implementation that delegates to underlying snapshot stores by
+ * extracting/applying fields in a snapshot thrift struct.
+ */
+public class SnapshotterImpl implements Snapshotter {
+
+  @VisibleForTesting
+  static final String SNAPSHOT_SAVE = "snapshot_save_";
+  @VisibleForTesting
+  static final String SNAPSHOT_RESTORE = "snapshot_restore_";
+
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotterImpl.class);
+
+  private static final String HOST_ATTRIBUTES_FIELD = "hosts";
+  private static final String QUOTA_FIELD = "quota";
+  private static final String TASK_FIELD = "tasks";
+  private static final String CRON_FIELD = "crons";
+  private static final String JOB_UPDATE_FIELD = "job_updates";
+  private static final String SCHEDULER_METADATA_FIELD = "scheduler_metadata";
+
+  @VisibleForTesting
+  Set<String> snapshotFieldNames() {
+    return snapshotFields.stream()
+        .map(SnapshotField::getName)
+        .collect(Collectors.toSet());
+  }
+
+  private final List<SnapshotField> snapshotFields = ImmutableList.of(
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return HOST_ATTRIBUTES_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setHostAttributes(
+              IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.getHostAttributesSize() > 0) {
+            return snapshot.getHostAttributes().stream()
+                .map(attributes -> Op.saveHostAttributes(
+                    new SaveHostAttributes().setHostAttributes(attributes)));
+          }
+          return Stream.empty();
+        }
+      },
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return TASK_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setTasks(
+              IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.getTasksSize() > 0) {
+            return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
+          }
+          return Stream.empty();
+        }
+      },
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return CRON_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
+
+          for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
+            jobs.add(new StoredCronJob(config.newBuilder()));
+          }
+          snapshot.setCronJobs(jobs.build());
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.getCronJobsSize() > 0) {
+            return snapshot.getCronJobs().stream()
+                .map(job -> Op.saveCronJob(
+                    new SaveCronJob().setJobConfig(job.getJobConfiguration())));
+          }
+          return Stream.empty();
+        }
+      },
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return SCHEDULER_METADATA_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          // SchedulerMetadata is updated outside of the static list of SnapshotFields
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.isSetSchedulerMetadata()
+              && snapshot.getSchedulerMetadata().isSetFrameworkId()) {
+            // No delete necessary here since this is a single value.
+
+            return Stream.of(Op.saveFrameworkId(
+                new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
+          }
+          return Stream.empty();
+        }
+      },
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return QUOTA_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
+          for (Map.Entry<String, IResourceAggregate> entry
+              : store.getQuotaStore().fetchQuotas().entrySet()) {
+
+            quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
+          }
+
+          snapshot.setQuotaConfigurations(quotas.build());
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.getQuotaConfigurationsSize() > 0) {
+            return snapshot.getQuotaConfigurations().stream()
+                .map(quota -> Op.saveQuota(new SaveQuota()
+                    .setRole(quota.getRole())
+                    .setQuota(quota.getQuota())));
+          }
+          return Stream.empty();
+        }
+      },
+      new SnapshotField() {
+        @Override
+        String getName() {
+          return JOB_UPDATE_FIELD;
+        }
+
+        @Override
+        void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
+          snapshot.setJobUpdateDetails(
+              store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
+                  .map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
+                  .collect(Collectors.toSet()));
+        }
+
+        @Override
+        Stream<Op> doStreamFrom(Snapshot snapshot) {
+          if (snapshot.getJobUpdateDetailsSize() > 0) {
+            return snapshot.getJobUpdateDetails().stream()
+                .flatMap(details -> {
+                  Stream<Op> parent = Stream.of(Op.saveJobUpdate(
+                      new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
+                  Stream<Op> jobEvents;
+                  if (details.getDetails().getUpdateEventsSize() > 0) {
+                    jobEvents = details.getDetails().getUpdateEvents().stream()
+                        .map(event -> Op.saveJobUpdateEvent(
+                            new SaveJobUpdateEvent()
+                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
+                                .setEvent(event)));
+                  } else {
+                    jobEvents = Stream.empty();
+                  }
+
+                  Stream<Op> instanceEvents;
+                  if (details.getDetails().getInstanceEventsSize() > 0) {
+                    instanceEvents = details.getDetails().getInstanceEvents().stream()
+                        .map(event -> Op.saveJobInstanceUpdateEvent(
+                            new SaveJobInstanceUpdateEvent()
+                                .setKey(details.getDetails().getUpdate().getSummary().getKey())
+                                .setEvent(event)));
+                  } else {
+                    instanceEvents = Stream.empty();
+                  }
+
+                  return Streams.concat(parent, jobEvents, instanceEvents);
+                });
+          }
+          return Stream.empty();
+        }
+      }
+  );
+
+  private final BuildInfo buildInfo;
+  private final Clock clock;
+
+  @Inject
+  public SnapshotterImpl(BuildInfo buildInfo, Clock clock) {
+    this.buildInfo = requireNonNull(buildInfo);
+    this.clock = requireNonNull(clock);
+  }
+
+  private Snapshot createSnapshot(StoreProvider storeProvider) {
+    Snapshot snapshot = new Snapshot();
+
+    // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
+    // one of the field closures is mean and tries to apply a timestamp.
+    long timestamp = clock.nowMillis();
+    for (SnapshotField field : snapshotFields) {
+      field.save(storeProvider, snapshot);
+    }
+
+    SchedulerMetadata metadata = new SchedulerMetadata()
+        .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orElse(null))
+        .setDetails(buildInfo.getProperties());
+
+    snapshot.setSchedulerMetadata(metadata);
+    snapshot.setTimestamp(timestamp);
+    return snapshot;
+  }
+
+  @Timed("snapshot_create")
+  @Override
+  public Snapshot from(StoreProvider stores) {
+    return createSnapshot(stores);
+  }
+
+  @Timed("snapshot_apply")
+  @Override
+  public Stream<Op> asStream(Snapshot snapshot) {
+    requireNonNull(snapshot);
+
+    LOG.info("Restoring snapshot.");
+    return snapshotFields.stream()
+        .flatMap(field -> field.streamFrom(snapshot));
+  }
+
+  abstract class SnapshotField {
+
+    abstract String getName();
+
+    abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
+
+    abstract Stream<Op> doStreamFrom(Snapshot snapshot);
+
+    void save(StoreProvider storeProvider, Snapshot snapshot) {
+      stats.getUnchecked(SNAPSHOT_SAVE + getName())
+          .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
+    }
+
+    Stream<Op> streamFrom(Snapshot snapshot) {
+      return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
+    }
+  }
+
+  private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
+      new CacheLoader<String, SlidingStats>() {
+        @Override
+        public SlidingStats load(String name) throws Exception {
+          return new SlidingStats(name, "nanos");
+        }
+      });
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 77fa904..63c338e 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -76,12 +76,14 @@ import org.apache.aurora.scheduler.mesos.DriverSettings;
 import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.log.EntrySerializer;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
 import org.apache.mesos.Protos;
@@ -198,7 +200,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
         BackupModule.Options backupOptions = new BackupModule.Options();
         backupOptions.backupDir = backupDir;
 
-        install(new BackupModule(backupOptions, SnapshotStoreImpl.class));
+        install(new BackupModule(backupOptions, SnapshotterImpl.class));
 
         bind(IServerInfo.class).toInstance(
             IServerInfo.build(
@@ -217,7 +219,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
         ImmutableList.<Module>builder()
             .add(SchedulerMain.getUniversalModule(new CliOptions()))
             .add(new TierModule(TaskTestUtil.TIER_CONFIG))
-            .add(new LogStorageModule(new LogStorageModule.Options()))
+            .add(new DurableStorageModule())
+            .add(new LogPersistenceModule(new LogPersistenceModule.Options()))
+            .add(new SnapshotModule(new SnapshotModule.Options()))
             .add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH))
             .add(testModule)
             .build()

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 53a2315..f685d2e 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -171,8 +171,8 @@ public class CommandLineTest {
     expected.updater.enableAffinity = true;
     expected.updater.affinityExpiration = TEST_TIME;
     expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
-    expected.logStorage.snapshotInterval = TEST_TIME;
-    expected.logStorage.maxLogEntrySize = TEST_DATA;
+    expected.snapshot.snapshotInterval = TEST_TIME;
+    expected.logPersistence.maxLogEntrySize = TEST_DATA;
     expected.backup.backupInterval = TEST_TIME;
     expected.backup.maxSavedBackups = 42;
     expected.backup.backupDir = new File("testing");

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
new file mode 100644
index 0000000..4cbd0cf
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/RecoveryTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class RecoveryTest {
+
+  @Test
+  public void testRecover() {
+    ListPersistence from = new ListPersistence(
+        Edit.op(Op.saveQuota(new SaveQuota())),
+        Edit.op(Op.saveTasks(new SaveTasks())));
+    ListPersistence to = new ListPersistence();
+
+    Recovery.copy(from, to, 100);
+
+    assertEquals(from.edits, to.edits);
+  }
+
+  @Test
+  public void testRecoverWithDeleteAll() {
+    ListPersistence from = new ListPersistence(
+        Edit.deleteAll(),
+        Edit.op(Op.saveQuota(new SaveQuota())),
+        Edit.op(Op.saveTasks(new SaveTasks())));
+    ListPersistence to = new ListPersistence();
+
+    Recovery.copy(from, to, 100);
+
+    assertEquals(from.edits.subList(1, from.edits.size()), to.edits);
+  }
+
+  @Test
+  public void testRequiresEmptyTarget() {
+    ListPersistence from = new ListPersistence();
+    ListPersistence to = new ListPersistence(Edit.op(Op.saveQuota(new SaveQuota())));
+
+    try {
+      Recovery.copy(from, to, 100);
+      fail();
+    } catch (IllegalStateException e) {
+      // expected.
+    }
+  }
+
+  @Test
+  public void testDeleteAllAfterFirstPosition() {
+    ListPersistence from = new ListPersistence(
+        Edit.op(Op.saveQuota(new SaveQuota())),
+        Edit.deleteAll(),
+        Edit.op(Op.saveTasks(new SaveTasks())));
+    ListPersistence to = new ListPersistence();
+
+    try {
+      Recovery.copy(from, to, 100);
+      fail();
+    } catch (IllegalStateException e) {
+      // expected
+    }
+  }
+
+  private static class ListPersistence implements Persistence {
+
+    private final List<Edit> edits;
+
+    ListPersistence(Edit... edits) {
+      this.edits = Lists.newArrayList(edits);
+    }
+
+    @Override
+    public void prepare() {
+      // no-op
+    }
+
+    @Override
+    public Stream<Edit> recover() {
+      return edits.stream();
+    }
+
+    @Override
+    public void persist(Stream<Op> records) throws PersistenceException {
+      edits.addAll(records.map(Edit::op).collect(Collectors.toList()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
index 3d6d555..a84e408 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
@@ -45,7 +45,7 @@ import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
 import org.apache.aurora.scheduler.storage.durability.Persistence;
 import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
@@ -67,7 +67,7 @@ public class LogPersistenceTest extends EasyMockTest {
     mockStream = createMock(Stream.class);
 
     Injector injector = Guice.createInjector(
-        new LogStorageModule(new Options()),
+        new LogPersistenceModule(new Options()),
         new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
         new TierModule(TaskTestUtil.TIER_CONFIG),
         new AbstractModule() {
@@ -77,7 +77,7 @@ public class LogPersistenceTest extends EasyMockTest {
             bind(EventSink.class).toInstance(e -> { });
             bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
             bind(Clock.class).toInstance(new FakeClock());
-            bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+            bind(Snapshotter.class).to(SnapshotterImpl.class);
             bind(Log.class).toInstance(mockLog);
           }
         }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index ffd4167..3bd8bf6 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -27,7 +27,6 @@ import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImp
 import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.inject.Bindings;
 import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.TearDownTestCase;
 import org.apache.aurora.common.util.BuildInfo;
@@ -36,7 +35,6 @@ import org.apache.aurora.common.util.testing.FakeBuildInfo;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.config.types.DataAmount;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
@@ -46,8 +44,9 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.log.LogPersistenceModule.Options;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
@@ -74,12 +73,13 @@ public class NonVolatileStorageTest extends TearDownTestCase {
 
     Options options = new Options();
     options.maxLogEntrySize = new DataAmount(1, Data.GB);
-    options.snapshotInterval = new TimeAmount(1, Time.DAYS);
 
     ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
     Injector injector = Guice.createInjector(
         new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
-        new LogStorageModule(options),
+        new DurableStorageModule(),
+        new LogPersistenceModule(options),
+        new SnapshotModule(new SnapshotModule.Options()),
         new TierModule(new TierModule.Options()),
         new AbstractModule() {
           @Override
@@ -90,7 +90,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
             bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Log.class).toInstance(log);
-            bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+            bind(Snapshotter.class).to(SnapshotterImpl.class);
           }
         }
     );

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e1ca428/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
index 270453d..e37d566 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
@@ -48,7 +48,8 @@ import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Snapshotter;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
+import org.apache.aurora.scheduler.storage.log.SnapshotModule.Options;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.IAnswer;
@@ -84,7 +85,9 @@ public class SnapshotServiceTest extends EasyMockTest {
 
     Injector injector = Guice.createInjector(
         new SchedulerServicesModule(),
-        new LogStorageModule(options),
+        new LogPersistenceModule(new LogPersistenceModule.Options()),
+        new SnapshotModule(options),
+        new DurableStorageModule(),
         new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
         new TierModule(TaskTestUtil.TIER_CONFIG),
         new AbstractModule() {


Mime
View raw message