aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kevi...@apache.org
Subject git commit: Performance improvements and instrumentation for snapshot
Date Thu, 18 Sep 2014 20:00:06 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9fd87234e -> 1c4a1a880


Performance improvements and instrumentation for snapshot

* Deflate snapshots using stream API
* Make LogManager non-final

Testing Done:
./gradlew -Pq build

The StreamManager @Timed annotations don't work yet since guice
isn't used to instantiate it. Ideally we'd use AssistedInject here,
but that's a slightly larger change.

Bugs closed: AURORA-722

Reviewed at https://reviews.apache.org/r/25760/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1c4a1a88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1c4a1a88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1c4a1a88

Branch: refs/heads/master
Commit: 1c4a1a8800a77226e367a3af2aae2db67cba477c
Parents: 9fd8723
Author: Kevin Sweeney <kevints@apache.org>
Authored: Thu Sep 18 12:59:27 2014 -0700
Committer: Kevin Sweeney <kevints@apache.org>
Committed: Thu Sep 18 12:59:27 2014 -0700

----------------------------------------------------------------------
 .../apache/aurora/codec/ThriftBinaryCodec.java  | 89 ++++++++++++++++++--
 .../aurora/scheduler/storage/log/Entries.java   | 52 ++----------
 .../scheduler/storage/log/LogManager.java       | 21 ++++-
 .../aurora/codec/ThriftBinaryCodecTest.java     | 11 +++
 4 files changed, 113 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4a1a88/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index da4f0e5..45cf7ec 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -13,7 +13,11 @@
  */
 package org.apache.aurora.codec;
 
-import java.util.Objects;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 import javax.annotation.Nullable;
 
@@ -22,7 +26,12 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Codec that works for thrift objects.
@@ -69,17 +78,13 @@ public final class ThriftBinaryCodec {
   public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz,
byte[] buffer)
       throws CodingException {
 
-    Objects.requireNonNull(clazz);
-    Objects.requireNonNull(buffer);
+    requireNonNull(clazz);
+    requireNonNull(buffer);
 
     try {
-      T t = clazz.newInstance();
+      T t = newInstance(clazz);
       new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
       return t;
-    } catch (IllegalAccessException e) {
-      throw new CodingException("Failed to access constructor for target type.", e);
-    } catch (InstantiationException e) {
-      throw new CodingException("Failed to instantiate target type.", e);
     } catch (TException e) {
       throw new CodingException("Failed to deserialize thrift object.", e);
     }
@@ -108,7 +113,7 @@ public final class ThriftBinaryCodec {
    * @throws CodingException If the object could not be encoded.
    */
   public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
-    Objects.requireNonNull(tBase);
+    requireNonNull(tBase);
 
     try {
       return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
@@ -118,6 +123,72 @@ public final class ThriftBinaryCodec {
   }
 
   /**
+   * Encodes a thrift object into a DEFLATE-compressed binary array.
+   *
+   * @param tBase Object to encode.
+   * @return Deflated, encoded object.
+   * @throws CodingException If the object could not be encoded.
+   */
+  public static byte[] deflateNonNull(TBase<?, ?> tBase) throws CodingException {
+    requireNonNull(tBase);
+
+    ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+    try {
+      TTransport transport = new TIOStreamTransport(new DeflaterOutputStream(outBytes));
+      TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport);
+      tBase.write(protocol);
+      transport.close();
+      return outBytes.toByteArray();
+    } catch (TException e) {
+      throw new CodingException("Failed to serialize: " + tBase, e);
+    }
+  }
+
+  /**
+   * Decodes a thrift object from a DEFLATE-compressed byte array into a target type.
+   *
+   * @param clazz Class to instantiate and deserialize to.
+   * @param buffer Compressed buffer to decode.
+   * @return A populated message.
+   * @throws CodingException If the message could not be decoded.
+   */
+  public static <T extends TBase<T, ?>> T inflateNonNull(Class<T> clazz,
byte[] buffer)
+      throws CodingException {
+
+    requireNonNull(clazz);
+    requireNonNull(buffer);
+
+    T tBase = newInstance(clazz);
+    try {
+      TTransport transport = new TIOStreamTransport(
+          new InflaterInputStream(new ByteArrayInputStream(buffer)));
+      TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport);
+      tBase.read(protocol);
+      return tBase;
+    } catch (TException e) {
+      throw new CodingException("Failed to deserialize: " + e, e);
+    }
+  }
+
+  private static <T extends TBase<T, ?>> T newInstance(Class<T> clazz)
throws CodingException {
+    try {
+      return clazz.getConstructor().newInstance();
+    } catch (InvocationTargetException e) {
+      throw new CodingException("Exception in constructor for target type: " + e, e);
+    } catch (NoSuchMethodException e) {
+      throw new CodingException(
+          "No no-args constructor for target type: "
+              + clazz
+              + ". Did the thrift code generator change?",
+          e);
+    } catch (InstantiationException e) {
+      throw new CodingException("Failed to instantiate target type.", e);
+    } catch (IllegalAccessException e) {
+      throw new CodingException("Failed to access constructor for target type.", e);
+    }
+  }
+
+  /**
    * Thrown when serialization or deserialization failed.
    */
   public static class CodingException extends Exception {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4a1a88/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
index 1eca768..6f94af5 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
@@ -13,21 +13,13 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
 
 import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.twitter.common.stats.Stats;
 
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.LogEntry._Fields;
 
@@ -38,9 +30,6 @@ final class Entries {
 
   private static final Logger LOG = Logger.getLogger(Entries.class.getName());
 
-  private static final AtomicLong COMPRESSION_BYTES_SAVED =
-      Stats.exportLong("log_compressed_entry_bytes_saved");
-
   private Entries() {
     // Utility class.
   }
@@ -59,28 +48,7 @@ final class Entries {
    * @throws CodingException If the value could not be encoded or deflated.
    */
   static LogEntry deflate(LogEntry entry) throws CodingException {
-    byte[] data = thriftBinaryEncode(entry);
-    int initialLength = data.length;
-    LOG.info("Deflating log entry of size " + initialLength);
-    ByteArrayOutputStream deflated = new ByteArrayOutputStream();
-    DeflaterOutputStream deflater = new DeflaterOutputStream(deflated);
-    try {
-      deflater.write(data);
-      deflater.flush();
-      deflater.close();
-      byte[] deflatedData = deflated.toByteArray();
-      int bytesSaved = initialLength - deflatedData.length;
-      if (bytesSaved < 0) {
-        LOG.warning("Deflated entry is larger than original by " + (bytesSaved * -1) + "
bytes");
-      } else {
-        LOG.info("Deflated log entry size: " + deflatedData.length + " (saved " + bytesSaved
+ ")");
-      }
-
-      COMPRESSION_BYTES_SAVED.addAndGet(bytesSaved);
-      return LogEntry.deflatedEntry(ByteBuffer.wrap(deflatedData));
-    } catch (IOException e) {
-      throw new CodingException("Failed to deflate snapshot: " + e, e);
-    }
+    return LogEntry.deflatedEntry(ThriftBinaryCodec.deflateNonNull(entry));
   }
 
   /**
@@ -96,19 +64,9 @@ final class Entries {
   static LogEntry inflate(LogEntry entry) throws CodingException {
     Preconditions.checkArgument(entry.isSet(_Fields.DEFLATED_ENTRY));
 
-    ByteArrayOutputStream inflated = new ByteArrayOutputStream();
-    ByteBuffer data = entry.bufferForDeflatedEntry();
-    LOG.info("Inflating deflated log entry of size " + data.remaining());
-    InflaterInputStream inflater = new InflaterInputStream(
-        new ByteArrayInputStream(data.array(), data.position(), data.remaining()));
-    try {
-      ByteStreams.copy(inflater, inflated);
-      byte[] inflatedData = inflated.toByteArray();
-      LOG.info("Inflated log entry size: " + inflatedData.length);
-      return thriftBinaryDecode(inflatedData);
-    } catch (IOException e) {
-      throw new CodingException("Failed to inflate compressed log entry.", e);
-    }
+    byte[] data = entry.getDeflatedEntry();
+    LOG.info("Inflating deflated log entry of size " + data.length);
+    return ThriftBinaryCodec.inflateNonNull(LogEntry.class, entry.getDeflatedEntry());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4a1a88/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
index 0b59043..87bd657 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
@@ -71,7 +71,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Manages opening, reading from and writing to a {@link Log}.
  */
-public final class LogManager {
+public class LogManager {
 
   /**
    * Identifies the maximum log entry size to permit before chunking entries into frames.
@@ -286,9 +286,11 @@ public final class LogManager {
     void snapshot(Snapshot snapshot)
         throws CodingException, InvalidPositionException, StreamAccessException {
 
-      LogEntry entry = LogEntry.snapshot(snapshot);
+      LogEntry entry;
       if (deflateSnapshots) {
-        entry = Entries.deflate(entry);
+        entry = deflate(snapshot);
+      } else {
+        entry = LogEntry.snapshot(snapshot);
       }
 
       Position position = appendAndGetPosition(entry);
@@ -297,8 +299,19 @@ public final class LogManager {
       stream.truncateBefore(position);
     }
 
+    // Not meant to be subclassed, but timed methods must be non-private.
+    // See https://github.com/google/guice/wiki/AOP#limitations
+    @VisibleForTesting
+    @Timed("log_manager_deflate")
+    LogEntry deflate(Snapshot snapshot) throws CodingException {
+      return Entries.deflate(LogEntry.snapshot(snapshot));
+    }
+
+    // Not meant to be subclassed, but timed methods must be non-private.
+    // See https://github.com/google/guice/wiki/AOP#limitations
+    @VisibleForTesting
     @Timed("log_manager_append")
-    private Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
+    Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
       Position firstPosition = null;
       byte[][] entries = entrySerializer.serialize(logEntry);
       synchronized (writeMutex) { // ensure all sub-entries are written as a unit

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1c4a1a88/src/test/java/org/apache/aurora/codec/ThriftBinaryCodecTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/codec/ThriftBinaryCodecTest.java b/src/test/java/org/apache/aurora/codec/ThriftBinaryCodecTest.java
index 7aaab1e..ebb4f9a 100644
--- a/src/test/java/org/apache/aurora/codec/ThriftBinaryCodecTest.java
+++ b/src/test/java/org/apache/aurora/codec/ThriftBinaryCodecTest.java
@@ -50,4 +50,15 @@ public class ThriftBinaryCodecTest {
   public void testDecodeNonNull() throws CodingException {
     ThriftBinaryCodec.decodeNonNull(Identity.class, null);
   }
+
+  @Test
+  public void testInflateDeflateRoundTrip() throws CodingException {
+    Identity original = new Identity("aurora", "jsmith");
+
+    byte[] deflated = ThriftBinaryCodec.deflateNonNull(original);
+
+    Identity inflated = ThriftBinaryCodec.inflateNonNull(Identity.class, deflated);
+
+    assertEquals(original, inflated);
+  }
 }


Mime
View raw message