cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Error out correctly when frame is too large
Date Mon, 04 Nov 2013 15:13:00 GMT
Updated Branches:
  refs/heads/cassandra-2.0 ce206e201 -> 4a439d22c


Error out correctly when frame is too large

patch by slebresne; reviewed by dnorberg for CASSANDRA-5981


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4a439d22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4a439d22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4a439d22

Branch: refs/heads/cassandra-2.0
Commit: 4a439d22c53c658a1f05bf84f56a319312669a96
Parents: ce206e2
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Nov 4 16:12:02 2013 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Nov 4 16:12:02 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   6 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +
 .../org/apache/cassandra/transport/Frame.java   | 150 +++++++++++--------
 6 files changed, 111 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70d295b..a6636a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
  * Require Permission.SELECT for CAS updates (CASSANDRA-6247)
  * New CQL-aware SSTableWriter (CASSANDRA-5894)
  * Reject CAS operation when the protocol v1 is used (CASSANDRA-6270)
+ * Correctly throw error when frame too large (CASSANDRA-5981)
 Merged from 1.2:
  * Require logging in for Thrift CQL2/3 statement preparation (CASSANDRA-6254)
  * restrict max_num_tokens to 1536 (CASSANDRA-6267)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a34947b..b89be56 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -15,6 +15,12 @@ using the provided 'sstableupgrade' tool.
 
 2.0.3
 =====
+
+New features
+------------
+    - It's now possible to configure the maximum allowed size of the native
+      protocol frames (native_transport_max_frame_size_in_mb in the yaml file).
+
 Upgrading
 ---------
     - The IEndpointStateChangeSubscriber has a new method, beforeChange, that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 455421a..b2e3a42 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -315,6 +315,10 @@ native_transport_port: 9042
 # there is no native_transport_min_threads, idle threads will always be stopped
 # after 30 seconds).
 # native_transport_max_threads: 128
+#
+# The maximum size of allowed frame. Frame (requests) larger than this will
+# be rejected as invalid. The default is 256MB.
+# native_transport_max_frame_size_in_mb: 256
 
 # Whether to start the thrift rpc server.
 start_rpc: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8f0f22e..3a7407e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -96,6 +96,7 @@ public class Config
     public Boolean start_native_transport = false;
     public Integer native_transport_port = 9042;
     public Integer native_transport_max_threads = 128;
+    public Integer native_transport_max_frame_size_in_mb = 256;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index aca58b7..7cdc4e6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -323,6 +323,9 @@ public class DatabaseDescriptor
         if (conf.thrift_framed_transport_size_in_mb <= 0)
             throw new ConfigurationException("thrift_framed_transport_size_in_mb must be
positive");
 
+        if (conf.native_transport_max_frame_size_in_mb <= 0)
+            throw new ConfigurationException("native_transport_max_frame_size_in_mb must
be positive");
+
         /* end point snitch */
         if (conf.endpoint_snitch == null)
         {
@@ -1028,6 +1031,11 @@ public class DatabaseDescriptor
         return conf.native_transport_max_threads;
     }
 
+    public static int getNativeTransportMaxFrameSize()
+    {
+        return conf.native_transport_max_frame_size_in_mb * 1024 * 1024;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 0d5185c..6472b39 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -28,6 +28,9 @@ import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 import org.jboss.netty.handler.codec.frame.*;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class Frame
@@ -52,27 +55,6 @@ public class Frame
         this.body = body;
     }
 
-    public static Frame create(ChannelBuffer fullFrame)
-    {
-        assert fullFrame.readableBytes() >= Header.LENGTH : String.format("Frame too short
(%d bytes = %s)",
-                                                                          fullFrame.readableBytes(),
-                                                                          ByteBufferUtil.bytesToHex(fullFrame.toByteBuffer()));
-
-        int version = fullFrame.readByte();
-        int flags = fullFrame.readByte();
-        int streamId = fullFrame.readByte();
-        int opcode = fullFrame.readByte();
-        int length = fullFrame.readInt();
-        assert length == fullFrame.readableBytes();
-
-        // version first byte is the "direction" of the frame (request or response)
-        Message.Direction direction = Message.Direction.extractFromVersion(version);
-        version = version & 0x7F;
-
-        Header header = new Header(version, flags, streamId, Message.Type.fromOpcode(opcode,
direction));
-        return new Frame(header, fullFrame);
-    }
-
     public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag>
flags, ChannelBuffer body)
     {
         Header header = new Header(version, flags, streamId, type);
@@ -83,6 +65,9 @@ public class Frame
     {
         public static final int LENGTH = 8;
 
+        public static final int BODY_LENGTH_OFFSET = 4;
+        public static final int BODY_LENGTH_SIZE = 4;
+
         public final int version;
         public final EnumSet<Flag> flags;
         public final int streamId;
@@ -134,15 +119,19 @@ public class Frame
         return new Frame(header, newBody);
     }
 
-    public static class Decoder extends LengthFieldBasedFrameDecoder
+    public static class Decoder extends FrameDecoder
     {
-        private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB
+        private static final int MAX_FRAME_LENGTH = DatabaseDescriptor.getNativeTransportMaxFrameSize();
+
+        private boolean discardingTooLongFrame;
+        private long tooLongFrameLength;
+        private long bytesToDiscard;
+        private int tooLongStreamId;
 
         private final Connection.Factory factory;
 
         public Decoder(Connection.Factory factory)
         {
-            super(MAX_FRAME_LENTH, 4, 4, 0, 0, true);
             this.factory = factory;
         }
 
@@ -150,55 +139,98 @@ public class Frame
         protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer
buffer)
         throws Exception
         {
-            try
+            if (discardingTooLongFrame)
             {
-                // We must at least validate that the frame version is something we support/know
and it doesn't hurt to
-                // check the opcode is not garbage. And we should do that indenpently of
what is the the bytes corresponding
-                // to the frame length are, i.e. we shouldn't wait for super.decode() to
return non-null.
-                if (buffer.readableBytes() == 0)
-                    return null;
+                bytesToDiscard = discard(buffer, bytesToDiscard);
+                // If we have discarded everything, throw the exception
+                if (bytesToDiscard <= 0)
+                    fail();
+                return null;
+            }
 
-                int firstByte = buffer.getByte(0);
-                Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
-                int version = firstByte & 0x7F;
+            // Wait until we have read at least the header
+            if (buffer.readableBytes() < Header.LENGTH)
+                return null;
 
-                if (version > Server.CURRENT_VERSION)
-                    throw new ProtocolException("Invalid or unsupported protocol version:
" + version);
+            int idx = buffer.readerIndex();
 
-                // Validate the opcode
-                if (buffer.readableBytes() >= 4)
-                    Message.Type.fromOpcode(buffer.getByte(3), direction);
+            int firstByte = buffer.getByte(idx);
+            Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
+            int version = firstByte & 0x7F;
 
-                ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
-                if (frame == null)
-                {
-                    return null;
-                }
+            if (version > Server.CURRENT_VERSION)
+                throw new ProtocolException("Invalid or unsupported protocol version: " +
version);
 
-                Connection connection = (Connection)channel.getAttachment();
-                if (connection == null)
-                {
-                    // First message seen on this channel, attach the connection object
-                    connection = factory.newConnection(channel, version);
-                    channel.setAttachment(connection);
-                }
-                else if (connection.getVersion() != version)
-                {
-                    throw new ProtocolException(String.format("Invalid message version. Got
%d but previous messages on this connection had version %d", version, connection.getVersion()));
-                }
-                return Frame.create(frame);
+            int flags = buffer.getByte(idx + 1);
+            int streamId = buffer.getByte(idx + 2);
+
+            // This throws a protocol exceptions if the opcode is unknown
+            Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
+
+            long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET);
+
+            if (bodyLength < 0)
+            {
+                buffer.skipBytes(Header.LENGTH);
+                throw new ProtocolException("Invalid frame body length: " + bodyLength);
+            }
+
+            long frameLength = bodyLength + Header.LENGTH;
+            if (frameLength > MAX_FRAME_LENGTH)
+            {
+                // Enter the discard mode and discard everything received so far.
+                discardingTooLongFrame = true;
+                tooLongStreamId = streamId;
+                tooLongFrameLength = frameLength;
+                bytesToDiscard = discard(buffer, frameLength);
+                if (bytesToDiscard <= 0)
+                    fail();
+                return null;
             }
-            catch (CorruptedFrameException e)
+
+            // never overflows because it's less than the max frame length
+            int frameLengthInt = (int) frameLength;
+            if (buffer.readableBytes() < frameLengthInt)
+                return null;
+
+            // extract body
+            ChannelBuffer body = extractFrame(buffer, idx + Header.LENGTH, (int)bodyLength);
+            buffer.readerIndex(idx + frameLengthInt);
+
+            Connection connection = (Connection)channel.getAttachment();
+            if (connection == null)
             {
-                throw new ProtocolException(e.getMessage());
+                // First message seen on this channel, attach the connection object
+                connection = factory.newConnection(channel, version);
+                channel.setAttachment(connection);
             }
-            catch (TooLongFrameException e)
+            else if (connection.getVersion() != version)
             {
-                throw new ProtocolException(e.getMessage());
+                throw new ProtocolException(String.format("Invalid message version. Got %d
but previous messages on this connection had version %d", version, connection.getVersion()));
             }
+
+            return new Frame(new Header(version, flags, streamId, type), body);
+        }
+
+        private void fail()
+        {
+            // Reset to the initial state and throw the exception
+            long tooLongFrameLength = this.tooLongFrameLength;
+            this.tooLongFrameLength = 0;
+            discardingTooLongFrame = false;
+            String msg = String.format("Request is too big: length %d exceeds maximum allowed
length %d.", tooLongFrameLength,  MAX_FRAME_LENGTH);
+            throw ErrorMessage.wrap(new InvalidRequestException(msg), tooLongStreamId);
         }
     }
 
+    // How much remains to be discarded
+    private static long discard(ChannelBuffer buffer, long remainingToDiscard)
+    {
+        int availableToDiscard = (int) Math.min(remainingToDiscard, buffer.readableBytes());
+        buffer.skipBytes(availableToDiscard);
+        return remainingToDiscard - availableToDiscard;
+    }
+
     public static class Encoder extends OneToOneEncoder
     {
         public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)


Mime
View raw message