activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5778
Date Mon, 18 May 2015 18:52:21 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 76b60ce44 -> 7c41ebc91


https://issues.apache.org/jira/browse/AMQ-5778

Fixes and some testing around maxFrameSize handling on the AMQP
Transport.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c41ebc9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c41ebc9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c41ebc9

Branch: refs/heads/master
Commit: 7c41ebc9121473e803c04e74f8ef7846e514814e
Parents: 76b60ce
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon May 18 14:51:56 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon May 18 14:51:56 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpTransportFilter.java     |   8 ++
 .../amqp/protocol/AmqpAbstractReceiver.java     |   5 +
 .../transport/amqp/protocol/AmqpConnection.java |  24 ++--
 .../transport/amqp/protocol/AmqpSession.java    |   4 +
 .../amqp/client/AmqpClientTestSupport.java      |  66 ++++++++++-
 .../transport/amqp/client/AmqpMessage.java      |  17 +++
 .../AmqpConfiguredMaxConnectionsTest.java       |  71 +-----------
 .../interop/AmqpCorruptedFrameHandlingTest.java |   2 +-
 .../amqp/interop/AmqpMaxFrameSizeTest.java      | 114 +++++++++++++++++++
 9 files changed, 232 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 9ca19b1..649af78 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -242,4 +242,12 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setConnectAttemptTimeout(int connectAttemptTimeout) {
         wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
     }
+
+    public long getMaxFrameSize() {
+        return wireFormat.getMaxFrameSize();
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) {
+        wireFormat.setMaxFrameSize(maxFrameSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index e119f61..7ed2f92 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.protocol;
 
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.fusesource.hawtbuf.Buffer;
@@ -99,6 +100,10 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver>
{
         int count;
         while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
             current.write(recvBuffer, 0, count);
+
+            if (current.size() > session.getMaxFrameSize()) {
+                throw new AmqpProtocolException("Frame size of " + current.size() + " larger
than max allowed " + session.getMaxFrameSize());
+            }
         }
 
         // Expecting more deliveries..

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index d743943..365c0fc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -256,6 +256,13 @@ public class AmqpConnection implements AmqpProtocolConverter {
         return connectionInfo.getClientId();
     }
 
+    /**
+     * @return the configured max frame size allowed for incoming messages.
+     */
+    public long getMaxFrameSize() {
+        return amqpWireFormat.getMaxFrameSize();
+    }
+
     //----- Proton Event handling and IO support -----------------------------//
 
     void pumpProtonToSocket() {
@@ -713,14 +720,17 @@ public class AmqpConnection implements AmqpProtocolConverter {
     }
 
     void handleException(Throwable exception) {
-        exception.printStackTrace();
         LOG.debug("Exception detail", exception);
-        try {
-            // Must ensure that the broker removes Connection resources.
-            sendToActiveMQ(new ShutdownInfo());
-            amqpTransport.stop();
-        } catch (Throwable e) {
-            LOG.error("Failed to stop AMQP Transport ", e);
+        if (exception instanceof AmqpProtocolException) {
+            onAMQPException((IOException) exception);
+        } else {
+            try {
+                // Must ensure that the broker removes Connection resources.
+                sendToActiveMQ(new ShutdownInfo());
+                amqpTransport.stop();
+            } catch (Throwable e) {
+                LOG.error("Failed to stop AMQP Transport ", e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index d2901ba..abc680b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -369,6 +369,10 @@ public class AmqpSession implements AmqpResource {
         return protonSession;
     }
 
+    public long getMaxFrameSize() {
+        return connection.getMaxFrameSize();
+    }
+
     //----- Internal Implementation ------------------------------------------//
 
     private ConsumerId getNextConsumerId() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
index 4d3f571..5504954 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
+import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.transport.amqp.AmqpTestSupport;
@@ -25,13 +26,76 @@ import org.apache.activemq.transport.amqp.AmqpTestSupport;
  */
 public class AmqpClientTestSupport extends AmqpTestSupport {
 
+    private String connectorScheme = "amqp";
+    private boolean useSSL;
+
+    public AmqpClientTestSupport() {
+    }
+
+    public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
+        this.connectorScheme = connectorScheme;
+        this.useSSL = useSSL;
+    }
+
+    public String getConnectorScheme() {
+        return connectorScheme;
+    }
+
+    public boolean isUseSSL() {
+        return useSSL;
+    }
+
     public String getAmqpConnectionURIOptions() {
         return "";
     }
 
+    @Override
+    protected boolean isUseTcpConnector() {
+        return !isUseSSL() && !connectorScheme.contains("nio");
+    }
+
+    @Override
+    protected boolean isUseSslConnector() {
+        return isUseSSL() && !connectorScheme.contains("nio");
+    }
+
+    @Override
+    protected boolean isUseNioConnector() {
+        return !isUseSSL() && connectorScheme.contains("nio");
+    }
+
+    @Override
+    protected boolean isUseNioPlusSslConnector() {
+        return isUseSSL() && connectorScheme.contains("nio");
+    }
+
     public URI getBrokerAmqpConnectionURI() {
         try {
-            String uri = "tcp://127.0.0.1:" + amqpPort;
+            int port = 0;
+            switch (connectorScheme) {
+                case "amqp":
+                    port = this.amqpPort;
+                    break;
+                case "amqp+ssl":
+                    port = this.amqpSslPort;
+                    break;
+                case "amqp+nio":
+                    port = this.amqpNioPort;
+                    break;
+                case "amqp+nio+ssl":
+                    port = this.amqpNioPlusSslPort;
+                    break;
+                default:
+                    throw new IOException("Invalid AMQP connector scheme passed to test.");
+            }
+
+            String uri = null;
+
+            if (isUseSSL()) {
+                uri = "ssl://127.0.0.1:" + port;
+            } else {
+                uri = "tcp://127.0.0.1:" + port;
+            }
 
             if (!getAmqpConnectionURIOptions().isEmpty()) {
                 uri = uri + "?" + getAmqpConnectionURIOptions();

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index e5d2d97..32dd1be 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -21,9 +21,11 @@ import java.util.Map;
 
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
 import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.engine.Delivery;
@@ -315,6 +317,21 @@ public class AmqpMessage {
         getWrappedMessage().setBody(body);
     }
 
+    /**
+     * Sets a byte array value into the body of an outgoing Message, throws
+     * an exception if this is an incoming message instance.
+     *
+     * @param value
+     *        the byte array value to store in the Message body.
+     *
+     * @throws IllegalStateException if the message is read only.
+     */
+    public void setBytes(byte[] bytes) throws IllegalStateException {
+        checkReadOnly();
+        Data body = new Data(new Binary(bytes));
+        getWrappedMessage().setBody(body);
+    }
+
     //----- Internal implementation ------------------------------------------//
 
     private void checkReadOnly() throws IllegalStateException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
index ae3f445..d1d1b04 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
@@ -20,8 +20,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -43,9 +41,6 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport
{
 
     private static final int MAX_CONNECTIONS = 10;
 
-    protected boolean useSSL;
-    protected String connectorScheme;
-
     @Parameters(name="{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
@@ -55,8 +50,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport
{
     }
 
     public AmqpConfiguredMaxConnectionsTest(String connectorScheme, boolean useSSL) {
-        this.connectorScheme = connectorScheme;
-        this.useSSL = useSSL;
+        super(connectorScheme, useSSL);
     }
 
     @Test(timeout = 60000)
@@ -92,69 +86,6 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport
{
         assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
     }
 
-
-    protected String getConnectorScheme() {
-        return connectorScheme;
-    }
-
-    protected boolean isUseSSL() {
-        return useSSL;
-    }
-
-    @Override
-    protected boolean isUseSslConnector() {
-        return isUseSSL();
-    }
-
-    @Override
-    protected boolean isUseNioConnector() {
-        return true;
-    }
-
-    @Override
-    protected boolean isUseNioPlusSslConnector() {
-        return isUseSSL();
-    }
-
-    @Override
-    public URI getBrokerAmqpConnectionURI() {
-        try {
-            int port = 0;
-            switch (connectorScheme) {
-                case "amqp":
-                    port = this.amqpPort;
-                    break;
-                case "amqp+ssl":
-                    port = this.amqpSslPort;
-                    break;
-                case "amqp+nio":
-                    port = this.amqpNioPort;
-                    break;
-                case "amqp+nio+ssl":
-                    port = this.amqpNioPlusSslPort;
-                    break;
-                default:
-                    throw new IOException("Invalid AMQP connector scheme passed to test.");
-            }
-
-            String uri = null;
-
-            if (isUseSSL()) {
-                uri = "ssl://127.0.0.1:" + port;
-            } else {
-                uri = "tcp://127.0.0.1:" + port;
-            }
-
-            if (!getAmqpConnectionURIOptions().isEmpty()) {
-                uri = uri + "?" + getAmqpConnectionURIOptions();
-            }
-
-            return new URI(uri);
-        } catch (Exception e) {
-            throw new RuntimeException();
-        }
-    }
-
     @Override
     protected String getAdditionalConfig() {
         return "&maximumConnections=" + MAX_CONNECTIONS;

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
index 3c57ecd..58440a2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorruptedFrameHandlingTest.java
@@ -35,7 +35,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport
{
 
     @Override
     protected String getAdditionalConfig() {
-        return "?transport.wireFormat.maxFrameSize=65535";
+        return "?transport.maxFrameSize=65535&transport.wireFormat.idleTimeout=5000";
     }
 
     @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c41ebc9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
new file mode 100644
index 0000000..7599c25
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test that the maxFrameSize setting prevents large frames from being processed.
+ */
+@RunWith(Parameterized.class)
+public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
+
+    private final String testName;
+    private final int maxFrameSize;
+    private final int maxAmqpFrameSize;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { "amqp-> MFS > MAFS", "amqp", false, 1024, 2048 },
+                { "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 },
+                { "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 },
+                { "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 },
+            });
+    }
+
+    public AmqpMaxFrameSizeTest(String testName, String connectorScheme, boolean useSSL,
int maxFrameSize, int maxAmqpFrameSize) {
+        super(connectorScheme, useSSL);
+
+        this.testName = testName;
+        this.maxFrameSize = maxFrameSize;
+        this.maxAmqpFrameSize = maxAmqpFrameSize;
+    }
+
+    @Override
+    protected String getAdditionalConfig() {
+        return "&transport.wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize +
+               "&transport.maxFrameSize=" + maxFrameSize;
+    }
+
+    @Test(timeout = 600000)
+    public void testMaxFrameSizeApplied() throws Exception {
+
+        LOG.info("Test starting {} for transport {} with MFS:{} and MAFS:{}",
+            new Object[]{ testName, getConnectorScheme(), maxFrameSize, maxAmqpFrameSize
});
+
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setListener(new AmqpConnectionListener() {
+
+            @Override
+            public void onException(Throwable ex) {
+                failed.countDown();
+            }
+        });
+
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName(), true);
+
+        byte[] payload = new byte[maxFrameSize];
+        for (int i = 0; i < payload.length; ++i) {
+            payload[i] = 42;
+        }
+
+        AmqpMessage message = new AmqpMessage();
+        message.setBytes(payload);
+
+        sender.send(message);
+
+        assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS));
+
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(0, getProxyToQueue(getTestName()).getQueueSize());
+
+        connection.close();
+    }
+}


Mime
View raw message