aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject git commit: Use AssistedInject to allow guice to construct StreamManager
Date Tue, 23 Sep 2014 01:41:49 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 8363eb8b7 -> 8a5b1d6d0


Use AssistedInject to allow guice to construct StreamManager

Use Guice AssistedInject to construct StreamManager
Refactor other components in log to be Guice-constructed.

More info: https://github.com/google/guice/wiki/AssistedInject

Testing Done:
./gradlew -Pq build

Also manually verified that the annotated methods are exported in
vagrant (by using aurora_admin scheduler_snapshot devcluster and
looking at /vars)

Reviewed at https://reviews.apache.org/r/25870/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/8a5b1d6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/8a5b1d6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/8a5b1d6d

Branch: refs/heads/master
Commit: 8a5b1d6d0f468fad786e1a446952f59d7b379f1c
Parents: 8363eb8
Author: Kevin Sweeney <kevints@apache.org>
Authored: Mon Sep 22 18:41:23 2014 -0700
Committer: Kevin Sweeney <kevints@apache.org>
Committed: Mon Sep 22 18:41:23 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |   4 +-
 .../scheduler/storage/log/EntrySerializer.java  |  94 ++++
 .../scheduler/storage/log/LogManager.java       | 461 +------------------
 .../scheduler/storage/log/LogStorage.java       |   2 -
 .../scheduler/storage/log/LogStorageModule.java |  19 +-
 .../scheduler/storage/log/StreamManager.java    |  76 +++
 .../storage/log/StreamManagerFactory.java       |  23 +
 .../storage/log/StreamManagerImpl.java          | 355 ++++++++++++++
 .../storage/log/StreamTransaction.java          |  40 ++
 .../aurora/scheduler/app/SchedulerIT.java       |   7 +-
 .../scheduler/storage/log/LogManagerTest.java   |  33 +-
 .../scheduler/storage/log/LogStorageTest.java   |  17 +-
 12 files changed, 665 insertions(+), 466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e519121..7e3e6d8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -141,6 +141,7 @@ jar {
 }
 
 dependencies {
+  def guiceRev = '3.0'
   def jerseyRev = '1.18.1'
   def log4jRev = '1.2.17'
   def slf4jRev = '1.6.1'
@@ -157,7 +158,8 @@ dependencies {
   compile 'com.google.code.findbugs:jsr305:2.0.1'
   compile gsonDep
   compile guavaDep
-  compile 'com.google.inject:guice:3.0'
+  compile "com.google.inject:guice:${guiceRev}"
+  compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
   compile 'com.google.protobuf:protobuf-java:2.5.0'
   compile 'com.h2database:h2:1.4.177'
   compile "com.sun.jersey:jersey-core:${jerseyRev}"

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
new file mode 100644
index 0000000..7239a6a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.nio.ByteBuffer;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashFunction;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import org.apache.aurora.gen.storage.Frame;
+import org.apache.aurora.gen.storage.FrameChunk;
+import org.apache.aurora.gen.storage.FrameHeader;
+import org.apache.aurora.gen.storage.LogEntry;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
+import static org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
+
+/**
+ * Logic for serializing distributed log entries.
+ */
+public interface EntrySerializer {
+  /**
+   * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
+   *
+   * @param logEntry The log entry to serialize.
+   * @return Serialized and chunked log entry.
+   * @throws CodingException If the entry could not be serialized.
+   */
+  byte[][] serialize(LogEntry logEntry) throws CodingException;
+
+  @VisibleForTesting
+  class EntrySerializerImpl implements EntrySerializer {
+    private final HashFunction hashFunction;
+    private final int maxEntrySizeBytes;
+
+    @Inject
+    @VisibleForTesting
+    public EntrySerializerImpl(
+        @MaxEntrySize Amount<Integer, Data> maxEntrySize,
+        @LogEntryHashFunction HashFunction hashFunction) {
+
+      this.hashFunction = requireNonNull(hashFunction);
+      maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
+    }
+
+    @Override
+    @Timed("log_entry_serialize")
+    public byte[][] serialize(LogEntry logEntry) throws CodingException {
+      byte[] entry = Entries.thriftBinaryEncode(logEntry);
+      if (entry.length <= maxEntrySizeBytes) {
+        return new byte[][] {entry};
+      }
+
+      int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
+      byte[][] frames = new byte[chunks + 1][];
+
+      frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
+      for (int i = 0; i < chunks; i++) {
+        int offset = i * maxEntrySizeBytes;
+        ByteBuffer chunk =
+            ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
+        frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
+      }
+      return frames;
+    }
+
+    private byte[] checksum(byte[] data) {
+      // TODO(ksweeney): Use the streaming API here.
+      return hashFunction.hashBytes(data).asBytes();
+    }
+
+    private static byte[] encode(Frame frame) throws CodingException {
+      return Entries.thriftBinaryEncode(LogEntry.frame(frame));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
index 156cc80..4b50e20 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
@@ -18,53 +18,11 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Qualifier;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Bytes;
-import com.twitter.common.base.Closure;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.storage.Frame;
-import org.apache.aurora.gen.storage.FrameChunk;
-import org.apache.aurora.gen.storage.FrameHeader;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.LogEntry._Fields;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.Transaction;
-import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.log.Log;
-import org.apache.aurora.scheduler.log.Log.Entry;
-import org.apache.aurora.scheduler.log.Log.Position;
-import org.apache.aurora.scheduler.log.Log.Stream;
-import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 
 import static java.util.Objects.requireNonNull;
 
@@ -72,7 +30,6 @@ import static java.util.Objects.requireNonNull;
  * Manages opening, reading from and writing to a {@link Log}.
  */
 public class LogManager {
-
   /**
    * Identifies the maximum log entry size to permit before chunking entries into frames.
    */
@@ -87,23 +44,26 @@ public class LogManager {
   @Retention(RetentionPolicy.RUNTIME)
   @Target({ ElementType.PARAMETER, ElementType.METHOD })
   @Qualifier
-  public @interface SnapshotSetting { }
+  public @interface DeflateSnapshots { }
 
-  private static final Logger LOG = Logger.getLogger(LogManager.class.getName());
+  /**
+   * Hash function used to verify log entries.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.PARAMETER, ElementType.METHOD})
+  @Qualifier
+  public @interface LogEntryHashFunction { }
 
   private final Log log;
-  private final Amount<Integer, Data> maxEntrySize;
-  private final boolean deflateSnapshots;
+  private final StreamManagerFactory streamManagerFactory;
 
   @Inject
   LogManager(
       Log log,
-      @MaxEntrySize Amount<Integer, Data> maxEntrySize,
-      @SnapshotSetting boolean deflateSnapshots) {
+      StreamManagerFactory streamManagerFactory) {
 
     this.log = requireNonNull(log);
-    this.maxEntrySize = requireNonNull(maxEntrySize);
-    this.deflateSnapshots = deflateSnapshots;
+    this.streamManagerFactory = requireNonNull(streamManagerFactory);
   }
 
   /**
@@ -113,404 +73,7 @@ public class LogManager {
    * @throws IOException If there is a problem opening the log.
    */
   public StreamManager open() throws IOException {
-    return new StreamManager(log.open(), deflateSnapshots, maxEntrySize);
+    return streamManagerFactory.create(log.open());
   }
 
-  /**
-   * Manages interaction with the log stream.  Log entries can be
-   * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
-   * a {@link #startTransaction() transaction} consisting of one or more local storage
-   * operations can be committed atomically, or the log can be compacted by
-   * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
-   */
-  public static class StreamManager {
-
-    private static MessageDigest createDigest() {
-      try {
-        return MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-        throw new IllegalStateException("Could not find provider for standard algorithm 'MD5'", e);
-      }
-    }
-
-    private static class Vars {
-      private final AtomicInteger unSnapshottedTransactions =
-          Stats.exportInt("scheduler_log_un_snapshotted_transactions");
-      private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
-      private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
-      private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
-      private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
-      private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
-      private final AtomicLong deflatedEntriesRead =
-          Stats.exportLong("scheduler_log_deflated_entries_read");
-      private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
-    }
-    private final Vars vars = new Vars();
-
-    private final Object writeMutex = new Object();
-    private final Stream stream;
-    private final boolean deflateSnapshots;
-    private final MessageDigest digest;
-    private final EntrySerializer entrySerializer;
-
-    StreamManager(Stream stream, boolean deflateSnapshots, Amount<Integer, Data> maxEntrySize) {
-      this.stream = requireNonNull(stream);
-      this.deflateSnapshots = deflateSnapshots;
-      digest = createDigest();
-      entrySerializer = new EntrySerializer(digest, maxEntrySize);
-    }
-
-    /**
-     * Reads all entries in the log stream after the given position.  If the position
-     * supplied is {@code null} then all log entries in the stream will be read.
-     *
-     * @param reader A reader that will be handed log entries decoded from the stream.
-     * @throws CodingException if there was a problem decoding a log entry from the stream.
-     * @throws InvalidPositionException if the given position is not found in the log.
-     * @throws StreamAccessException if there is a problem reading from the log.
-     */
-    public void readFromBeginning(Closure<LogEntry> reader)
-        throws CodingException, InvalidPositionException, StreamAccessException {
-
-      Iterator<Entry> entries = stream.readAll();
-
-      while (entries.hasNext()) {
-        LogEntry logEntry = decodeLogEntry(entries.next());
-        while (logEntry != null && isFrame(logEntry)) {
-          logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
-        }
-        if (logEntry != null) {
-          if (logEntry.isSet(_Fields.DEFLATED_ENTRY)) {
-            logEntry = Entries.inflate(logEntry);
-            vars.deflatedEntriesRead.incrementAndGet();
-          }
-
-          reader.execute(logEntry);
-          vars.entriesRead.incrementAndGet();
-        }
-      }
-    }
-
-    @Nullable
-    private LogEntry tryDecodeFrame(Frame frame, Iterator<Entry> entries) throws CodingException {
-      if (!isHeader(frame)) {
-        LOG.warning("Found a frame with no preceding header, skipping.");
-        return null;
-      }
-      FrameHeader header = frame.getHeader();
-      byte[][] chunks = new byte[header.getChunkCount()][];
-
-      digest.reset();
-      for (int i = 0; i < header.getChunkCount(); i++) {
-        if (!entries.hasNext()) {
-          logBadFrame(header, i);
-          return null;
-        }
-        LogEntry logEntry = decodeLogEntry(entries.next());
-        if (!isFrame(logEntry)) {
-          logBadFrame(header, i);
-          return logEntry;
-        }
-        Frame chunkFrame = logEntry.getFrame();
-        if (!isChunk(chunkFrame)) {
-          logBadFrame(header, i);
-          return logEntry;
-        }
-        byte[] chunkData = chunkFrame.getChunk().getData();
-        digest.update(chunkData);
-        chunks[i] = chunkData;
-      }
-      if (!Arrays.equals(header.getChecksum(), digest.digest())) {
-        throw new CodingException("Read back a framed log entry that failed its checksum");
-      }
-      return Entries.thriftBinaryDecode(Bytes.concat(chunks));
-    }
-
-    private static boolean isFrame(LogEntry logEntry) {
-      return logEntry.getSetField() == LogEntry._Fields.FRAME;
-    }
-
-    private static boolean isChunk(Frame frame) {
-      return frame.getSetField() == Frame._Fields.CHUNK;
-    }
-
-    private static boolean isHeader(Frame frame) {
-      return frame.getSetField() == Frame._Fields.HEADER;
-    }
-
-    private void logBadFrame(FrameHeader header, int chunkIndex) {
-      LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
-          header.getChunkCount(), chunkIndex));
-      vars.badFramesRead.incrementAndGet();
-    }
-
-    private LogEntry decodeLogEntry(Entry entry) throws CodingException {
-      byte[] contents = entry.contents();
-      vars.bytesRead.addAndGet(contents.length);
-      return Entries.thriftBinaryDecode(contents);
-    }
-
-    /**
-     * Truncates all entries in the log stream occuring before the given position.  The entry at the
-     * given position becomes the first entry in the stream when this call completes.
-     *
-     * @param position The last position to keep in the stream.
-     * @throws InvalidPositionException if the specified position does not exist in this log.
-     * @throws StreamAccessException if the stream could not be truncated.
-     */
-    void truncateBefore(Position position) {
-      stream.truncateBefore(position);
-    }
-
-    /**
-     * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
-     * atomically.
-     *
-     * @return StreamTransaction A transaction manager to handle batching up commits to the
-     *    underlying stream.
-     */
-    StreamTransaction startTransaction() {
-      return new StreamTransaction();
-    }
-
-    /**
-     * Adds a snapshot to the log and if successful, truncates the log entries preceding the
-     * snapshot.
-     *
-     * @param snapshot The snapshot to add.
-     * @throws CodingException if the was a problem encoding the snapshot into a log entry.
-     * @throws InvalidPositionException if there was a problem truncating before the snapshot.
-     * @throws StreamAccessException if there was a problem appending the snapshot to the log.
-     */
-    @Timed("log_manager_snapshot")
-    void snapshot(Snapshot snapshot)
-        throws CodingException, InvalidPositionException, StreamAccessException {
-
-      LogEntry entry;
-      if (deflateSnapshots) {
-        entry = deflate(snapshot);
-      } else {
-        entry = LogEntry.snapshot(snapshot);
-      }
-
-      Position position = appendAndGetPosition(entry);
-      vars.snapshots.incrementAndGet();
-      vars.unSnapshottedTransactions.set(0);
-      stream.truncateBefore(position);
-    }
-
-    // Not meant to be subclassed, but timed methods must be non-private.
-    // See https://github.com/google/guice/wiki/AOP#limitations
-    @VisibleForTesting
-    @Timed("log_manager_deflate")
-    LogEntry deflate(Snapshot snapshot) throws CodingException {
-      return Entries.deflate(LogEntry.snapshot(snapshot));
-    }
-
-    // Not meant to be subclassed, but timed methods must be non-private.
-    // See https://github.com/google/guice/wiki/AOP#limitations
-    @VisibleForTesting
-    @Timed("log_manager_append")
-    Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
-      Position firstPosition = null;
-      byte[][] entries = entrySerializer.serialize(logEntry);
-      synchronized (writeMutex) { // ensure all sub-entries are written as a unit
-        for (byte[] entry : entries) {
-          Position position = stream.append(entry);
-          if (firstPosition == null) {
-            firstPosition = position;
-          }
-          vars.bytesWritten.addAndGet(entry.length);
-        }
-      }
-      vars.entriesWritten.incrementAndGet();
-      return firstPosition;
-    }
-
-    @VisibleForTesting
-    public static class EntrySerializer {
-      private final MessageDigest digest;
-      private final int maxEntrySizeBytes;
-
-      EntrySerializer(MessageDigest digest, Amount<Integer, Data> maxEntrySize) {
-        this.digest = requireNonNull(digest);
-        maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
-      }
-
-      public EntrySerializer(Amount<Integer, Data> maxEntrySize) {
-        this(createDigest(), maxEntrySize);
-      }
-
-      /**
-       * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
-       *
-       * @param logEntry The log entry to serialize.
-       * @return Serialized and chunked log entry.
-       * @throws CodingException If the entry could not be serialized.
-       */
-      @VisibleForTesting
-      public byte[][] serialize(LogEntry logEntry) throws CodingException {
-        byte[] entry = Entries.thriftBinaryEncode(logEntry);
-        if (entry.length <= maxEntrySizeBytes) {
-          return new byte[][] {entry};
-        }
-
-        int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
-        byte[][] frames = new byte[chunks + 1][];
-
-        frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
-        for (int i = 0; i < chunks; i++) {
-          int offset = i * maxEntrySizeBytes;
-          ByteBuffer chunk =
-              ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
-          frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
-        }
-        return frames;
-      }
-
-      private byte[] checksum(byte[] data) {
-        digest.reset();
-        return digest.digest(data);
-      }
-
-      private static byte[] encode(Frame frame) throws CodingException {
-        return Entries.thriftBinaryEncode(LogEntry.frame(frame));
-      }
-    }
-
-    /**
-     * Manages a single log stream append transaction.  Local storage ops can be added to the
-     * transaction and then later committed as an atomic unit.
-     */
-    final class StreamTransaction {
-      private final Transaction transaction =
-          new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
-      private final AtomicBoolean committed = new AtomicBoolean(false);
-
-      StreamTransaction() {
-        // supplied by factory method
-      }
-
-      /**
-       * Appends any ops that have been added to this transaction to the log stream in a single
-       * atomic record.
-       *
-       * @return The position of the log entry committed in this transaction, if any.
-       * @throws CodingException If there was a problem encoding a log entry for commit.
-       */
-      Position commit() throws CodingException {
-        Preconditions.checkState(!committed.getAndSet(true),
-            "Can only call commit once per transaction.");
-
-        if (!transaction.isSetOps()) {
-          return null;
-        }
-
-        Position position = appendAndGetPosition(LogEntry.transaction(transaction));
-        vars.unSnapshottedTransactions.incrementAndGet();
-        return position;
-      }
-
-      /**
-       * Adds a local storage operation to this transaction.
-       *
-       * @param op The local storage op to add.
-       */
-      void add(Op op) {
-        Preconditions.checkState(!committed.get());
-
-        Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
-        if (prior == null || !coalesce(prior, op)) {
-          transaction.addToOps(op);
-        }
-      }
-
-      /**
-       * Tries to coalesce a new op into the prior to compact the binary representation and increase
-       * batching.
-       *
-       * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
-       * need not be</p>
-       *
-       * @param prior The previous op.
-       * @param next The next op to be added.
-       * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
-       */
-      private boolean coalesce(Op prior, Op next) {
-        if (!prior.isSet() && !next.isSet()) {
-          return false;
-        }
-
-        Op._Fields priorType = prior.getSetField();
-        if (!priorType.equals(next.getSetField())) {
-          return false;
-        }
-
-        switch (priorType) {
-          case SAVE_FRAMEWORK_ID:
-            prior.setSaveFrameworkId(next.getSaveFrameworkId());
-            return true;
-
-          case SAVE_ACCEPTED_JOB:
-          case REMOVE_JOB:
-          case SAVE_QUOTA:
-          case REMOVE_QUOTA:
-            return false;
-
-          case SAVE_TASKS:
-            coalesce(prior.getSaveTasks(), next.getSaveTasks());
-            return true;
-          case REMOVE_TASKS:
-            coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
-            return true;
-          case SAVE_HOST_ATTRIBUTES:
-            return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
-          default:
-            LOG.warning("Unoptimized op: " + priorType);
-            return false;
-        }
-      }
-
-      private void coalesce(SaveTasks prior, SaveTasks next) {
-        if (next.isSetTasks()) {
-          if (prior.isSetTasks()) {
-            // It is an expected invariant that an operation may reference a task (identified by
-            // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
-            // the most recent task definition overrides the prior operation.
-            Map<String, ScheduledTask> coalesced = Maps.newHashMap();
-            for (ScheduledTask task : prior.getTasks()) {
-              coalesced.put(task.getAssignedTask().getTaskId(), task);
-            }
-            for (ScheduledTask task : next.getTasks()) {
-              coalesced.put(task.getAssignedTask().getTaskId(), task);
-            }
-            prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
-          } else {
-            prior.setTasks(next.getTasks());
-          }
-        }
-      }
-
-      private void coalesce(RemoveTasks prior, RemoveTasks next) {
-        if (next.isSetTaskIds()) {
-          if (prior.isSetTaskIds()) {
-            prior.setTaskIds(ImmutableSet.<String>builder()
-                .addAll(prior.getTaskIds())
-                .addAll(next.getTaskIds())
-                .build());
-          } else {
-            prior.setTaskIds(next.getTaskIds());
-          }
-        }
-      }
-
-      private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
-        if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
-          prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
-          return true;
-        }
-        return false;
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 06e2e8d..f806297 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -70,8 +70,6 @@ import org.apache.aurora.scheduler.storage.entities.ILockKey;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.LogManager.StreamManager;
-import org.apache.aurora.scheduler.storage.log.LogManager.StreamManager.StreamTransaction;
 
 import static java.util.Objects.requireNonNull;
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 23ee32b..769348e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -18,9 +18,12 @@ import java.lang.annotation.Annotation;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
@@ -33,10 +36,11 @@ import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogManager.SnapshotSetting;
 import org.apache.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
 import org.apache.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
 
+import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+
 /**
  * Bindings for scheduler distributed log based storage.
  * <p/>
@@ -84,11 +88,22 @@ public class LogStorageModule extends AbstractModule {
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(MAX_LOG_ENTRY_SIZE.get());
     bind(LogManager.class).in(Singleton.class);
-    bind(Boolean.class).annotatedWith(SnapshotSetting.class).toInstance(DEFLATE_SNAPSHOTS.get());
+    bindConstant().annotatedWith(LogManager.DeflateSnapshots.class).to(DEFLATE_SNAPSHOTS.get());
     bind(LogStorage.class).in(Singleton.class);
 
     install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
     bind(DistributedSnapshotStore.class).to(LogStorage.class);
+
+    bind(EntrySerializer.class).to(EntrySerializerImpl.class);
+    // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
+    // versus a faster error-detection checksum like CRC32 for large Snapshots.
+    bind(HashFunction.class).annotatedWith(LogManager.LogEntryHashFunction.class)
+        .toInstance(Hashing.md5());
+
+    install(new FactoryModuleBuilder()
+        .implement(StreamManager.class, StreamManagerImpl.class)
+        .build(StreamManagerFactory.class));
+
   }
 
   private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
new file mode 100644
index 0000000..22db80e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import com.twitter.common.base.Closure;
+
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.log.Log;
+
+import static org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import static org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+
+/**
+ * Manages interaction with the log stream.  Log entries can be
+ * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
+ * a {@link #startTransaction() transaction} consisting of one or more local storage
+ * operations can be committed atomically, or the log can be compacted by
+ * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
+ */
+interface StreamManager {
+  /**
+   * Reads all entries in the log stream after the given position.  If the position
+   * supplied is {@code null} then all log entries in the stream will be read.
+   *
+   * @param reader A reader that will be handed log entries decoded from the stream.
+   * @throws CodingException if there was a problem decoding a log entry from the stream.
+   * @throws InvalidPositionException if the given position is not found in the log.
+   * @throws StreamAccessException if there is a problem reading from the log.
+   */
+  void readFromBeginning(Closure<LogEntry> reader)
+      throws CodingException, InvalidPositionException, StreamAccessException;
+
+  /**
+   * Truncates all entries in the log stream occuring before the given position.  The entry at the
+   * given position becomes the first entry in the stream when this call completes.
+   *
+   * @param position The last position to keep in the stream.
+   * @throws InvalidPositionException if the specified position does not exist in this log.
+   * @throws StreamAccessException if the stream could not be truncated.
+   */
+  void truncateBefore(Log.Position position);
+
+  /**
+   * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
+   * atomically.
+   *
+   * @return StreamTransaction A transaction manager to handle batching up commits to the
+   *    underlying stream.
+   */
+  StreamTransaction startTransaction();
+
+  /**
+   * Adds a snapshot to the log and if successful, truncates the log entries preceding the
+   * snapshot.
+   *
+   * @param snapshot The snapshot to add.
+   * @throws CodingException if the was a problem encoding the snapshot into a log entry.
+   * @throws InvalidPositionException if there was a problem truncating before the snapshot.
+   * @throws StreamAccessException if there was a problem appending the snapshot to the log.
+   */
+  void snapshot(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerFactory.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerFactory.java
new file mode 100644
index 0000000..52b2e83
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import org.apache.aurora.scheduler.log.Log;
+
+/**
+* Creates a StreamManager from an open stream.
+*/
+interface StreamManagerFactory {
+  StreamManager create(Log.Stream stream);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
new file mode 100644
index 0000000..e5cfbf5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.primitives.Bytes;
+import com.google.inject.assistedinject.Assisted;
+import com.twitter.common.base.Closure;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Frame;
+import org.apache.aurora.gen.storage.FrameHeader;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.Transaction;
+import org.apache.aurora.gen.storage.storageConstants;
+import org.apache.aurora.scheduler.log.Log;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.twitter.common.inject.TimedInterceptor.Timed;
+
+import static org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import static org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import static org.apache.aurora.scheduler.storage.log.LogManager.DeflateSnapshots;
+import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
+
+@VisibleForTesting
+class StreamManagerImpl implements StreamManager {
+  private static final Logger LOG = Logger.getLogger(StreamManagerImpl.class.getName());
+
+  private static class Vars {
+    private final AtomicInteger unSnapshottedTransactions =
+        Stats.exportInt("scheduler_log_un_snapshotted_transactions");
+    private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
+    private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
+    private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
+    private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
+    private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
+    private final AtomicLong deflatedEntriesRead =
+        Stats.exportLong("scheduler_log_deflated_entries_read");
+    private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
+  }
+  private final Vars vars = new Vars();
+
+  private final Object writeMutex = new Object();
+  private final Log.Stream stream;
+  private final EntrySerializer entrySerializer;
+  private final boolean deflateSnapshots;
+  private final HashFunction hashFunction;
+
+  @Inject
+  StreamManagerImpl(
+      @Assisted Log.Stream stream,
+      EntrySerializer entrySerializer,
+      @DeflateSnapshots boolean deflateSnapshots,
+      @LogEntryHashFunction HashFunction hashFunction) {
+
+    this.stream = requireNonNull(stream);
+    this.entrySerializer = requireNonNull(entrySerializer);
+    this.deflateSnapshots = deflateSnapshots;
+    this.hashFunction = requireNonNull(hashFunction);
+  }
+
+  @Override
+  public void readFromBeginning(Closure<LogEntry> reader)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    Iterator<Log.Entry> entries = stream.readAll();
+
+    while (entries.hasNext()) {
+      LogEntry logEntry = decodeLogEntry(entries.next());
+      while (logEntry != null && isFrame(logEntry)) {
+        logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+      }
+      if (logEntry != null) {
+        if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) {
+          logEntry = Entries.inflate(logEntry);
+          vars.deflatedEntriesRead.incrementAndGet();
+        }
+
+        reader.execute(logEntry);
+        vars.entriesRead.incrementAndGet();
+      }
+    }
+  }
+
+  @Nullable
+  private LogEntry tryDecodeFrame(Frame frame, Iterator<Log.Entry> entries) throws CodingException {
+    if (!isHeader(frame)) {
+      LOG.warning("Found a frame with no preceding header, skipping.");
+      return null;
+    }
+    FrameHeader header = frame.getHeader();
+    byte[][] chunks = new byte[header.getChunkCount()][];
+
+    Hasher hasher = hashFunction.newHasher();
+    for (int i = 0; i < header.getChunkCount(); i++) {
+      if (!entries.hasNext()) {
+        logBadFrame(header, i);
+        return null;
+      }
+      LogEntry logEntry = decodeLogEntry(entries.next());
+      if (!isFrame(logEntry)) {
+        logBadFrame(header, i);
+        return logEntry;
+      }
+      Frame chunkFrame = logEntry.getFrame();
+      if (!isChunk(chunkFrame)) {
+        logBadFrame(header, i);
+        return logEntry;
+      }
+      byte[] chunkData = chunkFrame.getChunk().getData();
+      hasher.putBytes(chunkData);
+      chunks[i] = chunkData;
+    }
+    if (!Arrays.equals(header.getChecksum(), hasher.hash().asBytes())) {
+      throw new CodingException("Read back a framed log entry that failed its checksum");
+    }
+    return Entries.thriftBinaryDecode(Bytes.concat(chunks));
+  }
+
+  private static boolean isFrame(LogEntry logEntry) {
+    return logEntry.getSetField() == LogEntry._Fields.FRAME;
+  }
+
+  private static boolean isChunk(Frame frame) {
+    return frame.getSetField() == Frame._Fields.CHUNK;
+  }
+
+  private static boolean isHeader(Frame frame) {
+    return frame.getSetField() == Frame._Fields.HEADER;
+  }
+
+  private void logBadFrame(FrameHeader header, int chunkIndex) {
+    LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
+        header.getChunkCount(), chunkIndex));
+    vars.badFramesRead.incrementAndGet();
+  }
+
+  private LogEntry decodeLogEntry(Log.Entry entry) throws CodingException {
+    byte[] contents = entry.contents();
+    vars.bytesRead.addAndGet(contents.length);
+    return Entries.thriftBinaryDecode(contents);
+  }
+
+  @Override
+  public void truncateBefore(Log.Position position) {
+    stream.truncateBefore(position);
+  }
+
+  @Override
+  public StreamTransactionImpl startTransaction() {
+    return new StreamTransactionImpl();
+  }
+  @Override
+  @Timed("log_manager_snapshot")
+  public void snapshot(Snapshot snapshot)
+      throws CodingException, InvalidPositionException, StreamAccessException {
+
+    LogEntry entry;
+    if (deflateSnapshots) {
+      entry = deflate(snapshot);
+    } else {
+      entry = LogEntry.snapshot(snapshot);
+    }
+
+    Log.Position position = appendAndGetPosition(entry);
+    vars.snapshots.incrementAndGet();
+    vars.unSnapshottedTransactions.set(0);
+    stream.truncateBefore(position);
+  }
+
+  // Not meant to be subclassed, but timed methods must be non-private.
+  // See https://github.com/google/guice/wiki/AOP#limitations
+  @VisibleForTesting
+  @Timed("log_manager_deflate")
+  LogEntry deflate(Snapshot snapshot) throws CodingException {
+    return Entries.deflate(LogEntry.snapshot(snapshot));
+  }
+
+  // Not meant to be subclassed, but timed methods must be non-private.
+  // See https://github.com/google/guice/wiki/AOP#limitations
+  @VisibleForTesting
+  @Timed("log_manager_append")
+  Log.Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
+    Log.Position firstPosition = null;
+    byte[][] entries = entrySerializer.serialize(logEntry);
+    synchronized (writeMutex) { // ensure all sub-entries are written as a unit
+      for (byte[] entry : entries) {
+        Log.Position position = stream.append(entry);
+        if (firstPosition == null) {
+          firstPosition = position;
+        }
+        vars.bytesWritten.addAndGet(entry.length);
+      }
+    }
+    vars.entriesWritten.incrementAndGet();
+    return firstPosition;
+  }
+
+  final class StreamTransactionImpl implements StreamTransaction {
+    private final Transaction transaction =
+        new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
+    private final AtomicBoolean committed = new AtomicBoolean(false);
+
+    StreamTransactionImpl() {
+      // supplied by factory method
+    }
+
+    @Override
+    public Log.Position commit() throws CodingException {
+      Preconditions.checkState(!committed.getAndSet(true),
+          "Can only call commit once per transaction.");
+
+      if (!transaction.isSetOps()) {
+        return null;
+      }
+
+      Log.Position position = appendAndGetPosition(LogEntry.transaction(transaction));
+      vars.unSnapshottedTransactions.incrementAndGet();
+      return position;
+    }
+
+    @Override
+    public void add(Op op) {
+      Preconditions.checkState(!committed.get());
+
+      Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
+      if (prior == null || !coalesce(prior, op)) {
+        transaction.addToOps(op);
+      }
+    }
+
+    /**
+     * Tries to coalesce a new op into the prior to compact the binary representation and increase
+     * batching.
+     *
+     * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
+     * need not be</p>
+     *
+     * @param prior The previous op.
+     * @param next The next op to be added.
+     * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+     */
+    private boolean coalesce(Op prior, Op next) {
+      if (!prior.isSet() && !next.isSet()) {
+        return false;
+      }
+
+      Op._Fields priorType = prior.getSetField();
+      if (!priorType.equals(next.getSetField())) {
+        return false;
+      }
+
+      switch (priorType) {
+        case SAVE_FRAMEWORK_ID:
+          prior.setSaveFrameworkId(next.getSaveFrameworkId());
+          return true;
+
+        case SAVE_ACCEPTED_JOB:
+        case REMOVE_JOB:
+        case SAVE_QUOTA:
+        case REMOVE_QUOTA:
+          return false;
+
+        case SAVE_TASKS:
+          coalesce(prior.getSaveTasks(), next.getSaveTasks());
+          return true;
+        case REMOVE_TASKS:
+          coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+          return true;
+        case SAVE_HOST_ATTRIBUTES:
+          return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+        default:
+          LOG.warning("Unoptimized op: " + priorType);
+          return false;
+      }
+    }
+
+    private void coalesce(SaveTasks prior, SaveTasks next) {
+      if (next.isSetTasks()) {
+        if (prior.isSetTasks()) {
+          // It is an expected invariant that an operation may reference a task (identified by
+          // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
+          // the most recent task definition overrides the prior operation.
+          Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+          for (ScheduledTask task : prior.getTasks()) {
+            coalesced.put(task.getAssignedTask().getTaskId(), task);
+          }
+          for (ScheduledTask task : next.getTasks()) {
+            coalesced.put(task.getAssignedTask().getTaskId(), task);
+          }
+          prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+        } else {
+          prior.setTasks(next.getTasks());
+        }
+      }
+    }
+
+    private void coalesce(RemoveTasks prior, RemoveTasks next) {
+      if (next.isSetTaskIds()) {
+        if (prior.isSetTaskIds()) {
+          prior.setTaskIds(ImmutableSet.<String>builder()
+              .addAll(prior.getTaskIds())
+              .addAll(next.getTaskIds())
+              .build());
+        } else {
+          prior.setTaskIds(next.getTaskIds());
+        }
+      }
+    }
+
+    private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+      if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+        prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+        return true;
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
new file mode 100644
index 0000000..a51fd18
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.scheduler.log.Log;
+
+/**
+ * Manages a single log stream append transaction.  Local storage ops can be added to the
+ * transaction and then later committed as an atomic unit.
+ */
+interface StreamTransaction {
+  /**
+   * Appends any ops that have been added to this transaction to the log stream in a single
+   * atomic record.
+   *
+   * @return The position of the log entry committed in this transaction, if any.
+   * @throws CodingException If there was a problem encoding a log entry for commit.
+   */
+  Log.Position commit() throws ThriftBinaryCodec.CodingException;
+
+  /**
+   * Adds a local storage operation to this transaction.
+   *
+   * @param op The local storage op to add.
+   */
+  void add(Op op);
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 899416f..606c443 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -32,6 +32,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.hash.Hashing;
 import com.google.common.testing.TearDown;
 import com.google.common.util.concurrent.Atomics;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -81,7 +82,7 @@ import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
 import org.apache.aurora.scheduler.storage.backup.BackupModule;
-import org.apache.aurora.scheduler.storage.log.LogManager.StreamManager.EntrySerializer;
+import org.apache.aurora.scheduler.storage.log.EntrySerializer;
 import org.apache.aurora.scheduler.storage.log.LogStorageModule;
 import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
@@ -160,7 +161,9 @@ public class SchedulerIT extends BaseZooKeeperTest {
     log = control.createMock(Log.class);
     logStream = control.createMock(Stream.class);
     streamMatcher = LogOpMatcher.matcherFor(logStream);
-    entrySerializer = new EntrySerializer(LogStorageModule.MAX_LOG_ENTRY_SIZE.get());
+    entrySerializer = new EntrySerializer.EntrySerializerImpl(
+        LogStorageModule.MAX_LOG_ENTRY_SIZE.get(),
+        Hashing.md5());
 
     zkClient = createZkClient();
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 0cfa73f..39729b3 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import com.twitter.common.base.Closure;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
@@ -57,8 +59,6 @@ import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.log.Log.Entry;
 import org.apache.aurora.scheduler.log.Log.Position;
 import org.apache.aurora.scheduler.log.Log.Stream;
-import org.apache.aurora.scheduler.storage.log.LogManager.StreamManager;
-import org.apache.aurora.scheduler.storage.log.LogManager.StreamManager.StreamTransaction;
 import org.easymock.EasyMock;
 import org.easymock.IArgumentMatcher;
 import org.junit.Before;
@@ -102,8 +102,12 @@ public class LogManagerTest extends EasyMockTest {
     return createStreamManager(NO_FRAMES_EVER_SIZE);
   }
 
-  private StreamManager createStreamManager(Amount<Integer, Data> maxEntrySize) {
-    return new StreamManager(stream, false, maxEntrySize);
+  private StreamManager createStreamManager(final Amount<Integer, Data> maxEntrySize) {
+    return new StreamManagerImpl(
+        stream,
+        new EntrySerializer.EntrySerializerImpl(maxEntrySize, Hashing.md5()),
+        false,
+        Hashing.md5());
   }
 
   @Test
@@ -272,9 +276,10 @@ public class LogManagerTest extends EasyMockTest {
     Op deleteJob = Op.removeJob(new RemoveJob(JobKeys.from("role", "env", "name").newBuilder()));
     expectTransaction(position1, saveFrameworkId, deleteJob);
 
+    StreamManager streamManager = createNoMessagesStreamManager();
     control.replay();
 
-    StreamTransaction transaction = createNoMessagesStreamManager().startTransaction();
+    StreamTransaction transaction = streamManager.startTransaction();
     transaction.add(saveFrameworkId);
     transaction.add(deleteJob);
 
@@ -329,9 +334,9 @@ public class LogManagerTest extends EasyMockTest {
     Message message = frame(createLogEntry(saveFrameworkId));
     expectFrames(position1, message);
 
+    StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
 
-    StreamManager streamManager = createStreamManager(message.chunkSize);
     StreamTransaction transaction = streamManager.startTransaction();
     transaction.add(saveFrameworkId);
 
@@ -391,7 +396,11 @@ public class LogManagerTest extends EasyMockTest {
       }
     };
 
-    final StreamManager streamManager = new StreamManager(mockStream, false, message1.chunkSize);
+    final StreamManagerImpl streamManager = new StreamManagerImpl(
+        mockStream,
+        new EntrySerializer.EntrySerializerImpl(message1.chunkSize, Hashing.md5()),
+        false,
+        Hashing.md5());
     StreamTransaction tr1 = streamManager.startTransaction();
     tr1.add(op1);
 
@@ -470,9 +479,10 @@ public class LogManagerTest extends EasyMockTest {
     reader.execute(transaction1);
     reader.execute(transaction2);
 
+    StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
 
-    createStreamManager(message.chunkSize).readFromBeginning(reader);
+    streamManager.readFromBeginning(reader);
   }
 
   @Test
@@ -494,7 +504,12 @@ public class LogManagerTest extends EasyMockTest {
 
     control.replay();
 
-    StreamManager streamManager = new StreamManager(stream, true, NO_FRAMES_EVER_SIZE);
+    HashFunction md5 = Hashing.md5();
+    StreamManagerImpl streamManager = new StreamManagerImpl(
+        stream,
+        new EntrySerializer.EntrySerializerImpl(NO_FRAMES_EVER_SIZE, md5),
+        true,
+        md5);
     streamManager.snapshot(snapshot);
     streamManager.readFromBeginning(reader);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8a5b1d6d/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 8fbade1..68df0d5 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -22,6 +22,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import com.google.common.testing.TearDown;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
@@ -131,7 +133,20 @@ public class LogStorageTest extends EasyMockTest {
   public void setUp() {
     log = createMock(Log.class);
 
-    LogManager logManager = new LogManager(log, Amount.of(1, Data.GB), false);
+    StreamManagerFactory streamManagerFactory = new StreamManagerFactory() {
+      @Override
+      public StreamManager create(Stream logStream) {
+        HashFunction md5 = Hashing.md5();
+        return new StreamManagerImpl(
+            logStream,
+            new EntrySerializer.EntrySerializerImpl(
+                Amount.of(1, Data.GB),
+                md5),
+            false,
+            md5);
+      }
+    };
+    LogManager logManager = new LogManager(log, streamManagerFactory);
 
     schedulingService = createMock(SchedulingService.class);
     snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });


Mime
View raw message