cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Add LZ4 compression to the native protocol
Date Fri, 19 Jul 2013 06:40:55 GMT
Updated Branches:
  refs/heads/trunk dc51727a1 -> 4c129ae7e


Add LZ4 compression to the native protocol

patch by slebresne; reviewed by iamaleksey for CASSANDRA-5765


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c129ae7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c129ae7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c129ae7

Branch: refs/heads/trunk
Commit: 4c129ae7ef02180bb9dc961420f1f2572c215163
Parents: dc51727
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Jul 19 08:38:01 2013 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Jul 19 08:40:27 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/native_protocol_v2.spec                     |  8 ++
 .../cassandra/transport/FrameCompressor.java    | 77 ++++++++++++++++++++
 .../transport/messages/OptionsMessage.java      |  2 +
 .../transport/messages/StartupMessage.java      |  4 +
 5 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c129ae7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00abb86..d872f08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
  * cqlsh: add missing table options to DESCRIBE output (CASSANDRA-5749)
  * Fix assertion error during repair (CASSANDRA-5757)
  * Fix bulkloader (CASSANDRA-5542)
+ * Add LZ4 compression to the native protocol (CASSANDRA-5765)
 
 
 2.0.0-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c129ae7/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index ae730eb..3a519a9 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -662,6 +662,14 @@ Table of Contents
   discretion). A frame body should be compressed if and only if the compressed
   flag (see Section 2.2) is set.
 
+  As of this version 2 of the protocol, the following compressions are available:
+    - lz4 (https://code.google.com/p/lz4/). In that, note that the 4 first bytes
+      of the body will be the uncompressed length (followed by the compressed
+      bytes).
+    - snappy (https://code.google.com/p/snappy/). This compression might not be
+      available as it depends on a native lib (server-side) that might not be
+      avaivable on some installation.
+
 
 6. Collection types
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c129ae7/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index 3f96948..e1d90f1 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -23,6 +23,9 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyError;
 
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+
 public interface FrameCompressor
 {
     public Frame compress(Frame frame) throws IOException;
@@ -91,4 +94,78 @@ public interface FrameCompressor
             return frame.with(ChannelBuffers.wrappedBuffer(output, 0, size));
         }
     }
+
+    /*
+     * This is very close to the ICompressor implementation, and in particular
+     * it also layout the uncompressed size at the beginning of the message to
+     * make uncompression faster, but contrarly to the ICompressor, that length
+     * is written in big-endian. The native protocol is entirely big-endian, so
+     * it feels like putting little-endian here would be a annoying trap for
+     * client writer.
+     */
+    public static class LZ4Compressor implements FrameCompressor
+    {
+        public static final LZ4Compressor instance = new LZ4Compressor();
+
+        private static final int INTEGER_BYTES = 4;
+        private final net.jpountz.lz4.LZ4Compressor compressor;
+        private final net.jpountz.lz4.LZ4Decompressor decompressor;
+
+        private LZ4Compressor()
+        {
+            final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
+            compressor = lz4Factory.fastCompressor();
+            decompressor = lz4Factory.decompressor();
+        }
+
+        public Frame compress(Frame frame) throws IOException
+        {
+            byte[] input = new byte[frame.body.readableBytes()];
+            frame.body.readBytes(input);
+
+            int maxCompressedLength = compressor.maxCompressedLength(input.length);
+            byte[] output = new byte[INTEGER_BYTES + maxCompressedLength];
+
+            output[0] = (byte) (input.length >>> 24);
+            output[1] = (byte) (input.length >>> 16);
+            output[2] = (byte) (input.length >>>  8);
+            output[3] = (byte) (input.length);
+
+            try
+            {
+                int written = compressor.compress(input, 0, input.length, output, INTEGER_BYTES,
maxCompressedLength);
+                return frame.with(ChannelBuffers.wrappedBuffer(output, 0, INTEGER_BYTES +
written));
+            }
+            catch (LZ4Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+
+        public Frame decompress(Frame frame) throws IOException
+        {
+            byte[] input = new byte[frame.body.readableBytes()];
+            frame.body.readBytes(input);
+
+            int uncompressedLength = ((input[0] & 0xFF) << 24)
+                                   | ((input[1] & 0xFF) << 16)
+                                   | ((input[2] & 0xFF) <<  8)
+                                   | ((input[3] & 0xFF));
+
+            byte[] output = new byte[uncompressedLength];
+
+            try
+            {
+                int read = decompressor.decompress(input, INTEGER_BYTES, output, 0, uncompressedLength);
+                if (read != input.length - INTEGER_BYTES)
+                    throw new IOException("Compressed lengths mismatch");
+
+                return frame.with(ChannelBuffers.wrappedBuffer(output));
+            }
+            catch (LZ4Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c129ae7/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 5afefb5..49d8e1b 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -66,6 +66,8 @@ public class OptionsMessage extends Message.Request
         List<String> compressions = new ArrayList<String>();
         if (FrameCompressor.SnappyCompressor.instance != null)
             compressions.add("snappy");
+        // LZ4 is always available since worst case scenario it default to a pure JAVA implem.
+        compressions.add("lz4");
 
         Map<String, List<String>> supported = new HashMap<String, List<String>>();
         supported.put(StartupMessage.CQL_VERSION, cqlVersions);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c129ae7/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index ea6ae99..6d6d1d9 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -94,6 +94,10 @@ public class StartupMessage extends Message.Request
                     throw new ProtocolException("This instance does not support Snappy compression");
                 connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
             }
+            else if (compression.equals("lz4"))
+            {
+                connection.setCompressor(FrameCompressor.LZ4Compressor.instance);
+            }
             else
             {
                 throw new ProtocolException(String.format("Unknown compression algorithm:
%s", compression));


Mime
View raw message