activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5731
Date Thu, 16 Apr 2015 19:06:13 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 9445e93ae -> a5c2f3f42


https://issues.apache.org/jira/browse/AMQ-5731

Add some additional checks and handlers for frames with an invalid size
prefix and ensure that the connection state is torn down broker side.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a5c2f3f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a5c2f3f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a5c2f3f4

Branch: refs/heads/master
Commit: a5c2f3f42373a7e091ea1a933c23cf7298b23f60
Parents: 9445e93
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Apr 16 14:53:22 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Apr 16 14:53:22 2015 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpWireFormat.java |  2 +
 .../transport/amqp/protocol/AmqpConnection.java |  9 ++
 .../interop/AmqpCorruptedFrameHandlingTest.java | 90 +++++++++++++++++++-
 3 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 3734cc5..570fd2b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -104,6 +104,8 @@ public class AmqpWireFormat implements WireFormat {
             int size = dataIn.readInt();
             if (size > maxFrameSize) {
                 throw new AmqpProtocolException("Frame size exceeded max frame length.");
+            } else if (size <= 0) {
+                throw new AmqpProtocolException("Frame size value was invalid: " + size);
             }
             Buffer frame = new Buffer(size);
             frame.bigEndianEditor().writeInt(size);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index b8c6997..bab16c9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -497,6 +497,15 @@ public class AmqpConnection implements AmqpProtocolConverter {
     public void onAMQPException(IOException error) {
         closedSocket = true;
         if (!closing) {
+            try {
+                closing = true;
+                // Attempt to inform the other end that we are going to close
+                // so that the client doesn't wait around forever.
+                protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR,
error.getMessage()));
+                protonConnection.close();
+                pumpProtonToSocket();
+            } catch (Exception ignore) {
+            }
             amqpTransport.sendToActiveMQ(error);
         } else {
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5c2f3f4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
index 5ee08ca..3c57ecd 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
@@ -33,8 +33,13 @@ import org.junit.Test;
  */
 public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
 
+    @Override
+    protected String getAdditionalConfig() {
+        return "?transport.wireFormat.maxFrameSize=65535";
+    }
+
     @Test(timeout = 60000)
-    public void testCanConnect() throws Exception {
+    public void testHandlingCorruptedFramePayload() throws Exception {
         Random random = new Random();
         random.setSeed(System.nanoTime());
 
@@ -65,5 +70,88 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport
{
         }));
 
         connection.close();
+
+        // Should be able to recycle the client ID now.
+        connection = client.createConnection();
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
+    }
+
+    @Test(timeout = 60000)
+    public void testHandleFrameWithNegativeSize() throws Exception {
+        Random random = new Random();
+        random.setSeed(System.nanoTime());
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        // Send frame with valid size prefix, but corrupted payload.
+        byte[] corruptedFrame = new byte[1024];
+        random.nextBytes(corruptedFrame);
+        corruptedFrame[0] = (byte) 0xFF;
+        corruptedFrame[1] = 0x0;
+        corruptedFrame[2] = 0x4;
+        corruptedFrame[3] = 0x0;
+
+        connection.sendRawBytes(corruptedFrame);
+
+        assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+
+        connection.close();
+
+        // Should be able to recycle the client ID now.
+        connection = client.createConnection();
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
+    }
+
+    @Test(timeout = 60000)
+    public void testHandleFrameSizeExceedsMaxFrameSize() throws Exception {
+        Random random = new Random();
+        random.setSeed(System.nanoTime());
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        // Send frame with valid size prefix, but corrupted payload.
+        byte[] corruptedFrame = new byte[1024];
+        random.nextBytes(corruptedFrame);
+        corruptedFrame[0] = 0x0;
+        corruptedFrame[1] = 0x7F;
+        corruptedFrame[2] = 0x7F;
+        corruptedFrame[3] = 0x7F;
+
+        connection.sendRawBytes(corruptedFrame);
+
+        assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+
+        connection.close();
+
+        // Should be able to recycle the client ID now.
+        connection = client.createConnection();
+        connection.setContainerId("ClientID:" + getTestName());
+        connection.connect();
     }
 }


Mime
View raw message