activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6669
Date Thu, 04 May 2017 20:54:21 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 675729c20 -> bc879d762


https://issues.apache.org/jira/browse/AMQ-6669

Respect the wireFormat.maxFrameSize option on WS and WSS transports
allowing binary content larger than 65535
(cherry picked from commit 2e2d5ddd3de7d0fe36ce5eb3d4fe81e97fe990a4)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bc879d76
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bc879d76
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bc879d76

Branch: refs/heads/activemq-5.14.x
Commit: bc879d762ab4b5e32df9d4bd50c19c192a87a777
Parents: 675729c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu May 4 16:37:53 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu May 4 16:39:49 2017 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpWSTransport.java         |  5 +++
 .../activemq/transport/amqp/AmqpWireFormat.java |  3 +-
 .../transport/amqp/AmqpWireFormatFactory.java   |  2 +-
 .../transport/amqp/JMSClientContext.java        | 34 +++++++++++++++-
 .../transport/amqp/JMSClientTestSupport.java    | 34 +++++++++++++++-
 .../amqp/JMSLargeMessageSendRecvTest.java       | 26 +++++++++++-
 .../transport/amqp/client/AmqpSession.java      |  6 +++
 .../AmqpBrokerReuqestedHearbeatsTest.java       |  4 +-
 .../AmqpClientRequestsHeartbeatsTest.java       |  4 +-
 .../amqp/interop/AmqpConnectionsTest.java       | 43 ++++++++++++++++++++
 .../amqp/interop/AmqpMaxFrameSizeTest.java      |  9 +++-
 .../activemq/transport/ws/WSTransport.java      |  5 +++
 .../activemq/transport/ws/WSTransportProxy.java |  5 +++
 .../transport/ws/WSTransportServer.java         |  4 +-
 .../activemq/transport/ws/jetty9/WSServlet.java |  2 +-
 15 files changed, 170 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
index 2ec3a09..26b3561 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
@@ -100,6 +100,11 @@ public class AmqpWSTransport extends TransportSupport implements WSTransport,
AM
     }
 
     @Override
+    public int getMaxFrameSize() {
+        return (int) Math.min(((AmqpWireFormat) getWireFormat()).getMaxFrameSize(), Integer.MAX_VALUE);
+    }
+
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         // Currently nothing needed here since we have no async workers.
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 89facbe..7ddfc1f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -45,12 +45,13 @@ public class AmqpWireFormat implements WireFormat {
     public static final int DEFAULT_IDLE_TIMEOUT = 30000;
     public static final int DEFAULT_PRODUCER_CREDIT = 1000;
     public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false;
+    public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE;
 
     private static final int SASL_PROTOCOL = 3;
 
     private int version = 1;
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
-    private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
+    private int maxAmqpFrameSize = DEFAULT_ANQP_FRAME_SIZE;
     private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
     private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
     private int producerCredit = DEFAULT_PRODUCER_CREDIT;

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index bb428b4..196046c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -26,7 +26,7 @@ import org.apache.activemq.wireformat.WireFormatFactory;
 public class AmqpWireFormatFactory implements WireFormatFactory {
 
     private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
-    private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE;
+    private int maxAmqpFrameSize = AmqpWireFormat.DEFAULT_ANQP_FRAME_SIZE;
     private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
     private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
index 574e9f0..f249e7c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java
@@ -168,9 +168,39 @@ public class JMSClientContext {
     private ConnectionFactory createConnectionFactory(
         URI remoteURI, String username, String password, boolean syncPublish) {
 
-        boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
+        String clientScheme;
+        boolean useSSL = false;
+
+        switch (remoteURI.getScheme()) {
+            case "tcp" :
+            case "amqp":
+            case "auto":
+            case "amqp+nio":
+            case "auto+nio":
+                clientScheme = "amqp://";
+                break;
+            case "ssl":
+            case "amqp+ssl":
+            case "auto+ssl":
+            case "amqp+nio+ssl":
+            case "auto+nio+ssl":
+                clientScheme = "amqps://";
+                useSSL = true;
+                break;
+            case "ws":
+            case "amqp+ws":
+                clientScheme = "amqpws://";
+                break;
+            case "wss":
+            case "amqp+wss":
+                clientScheme = "amqpwss://";
+                useSSL = true;
+                break;
+            default:
+                clientScheme = "amqp://";
+        }
 
-        String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" +
remoteURI.getPort();
+        String amqpURI = clientScheme + remoteURI.getHost() + ":" + remoteURI.getPort();
 
         if (useSSL) {
             amqpURI += "?transport.verifyHost=false";

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
index d855c6b..8408652 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java
@@ -92,9 +92,39 @@ public class JMSClientTestSupport extends AmqpTestSupport {
 
     protected URI getAmqpURI(String uriOptions) {
 
-        boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
+        String clientScheme;
+        boolean useSSL = false;
+
+        switch (getBrokerURI().getScheme()) {
+            case "tcp" :
+            case "amqp":
+            case "auto":
+            case "amqp+nio":
+            case "auto+nio":
+                clientScheme = "amqp://";
+                break;
+            case "ssl":
+            case "amqp+ssl":
+            case "auto+ssl":
+            case "amqp+nio+ssl":
+            case "auto+nio+ssl":
+                clientScheme = "amqps://";
+                useSSL = true;
+                break;
+            case "ws":
+            case "amqp+ws":
+                clientScheme = "amqpws://";
+                break;
+            case "wss":
+            case "amqp+wss":
+                clientScheme = "amqpwss://";
+                useSSL = true;
+                break;
+            default:
+                clientScheme = "amqp://";
+        }
 
-        String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":"
+ getBrokerURI().getPort();
+        String amqpURI = clientScheme + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
 
         if (uriOptions != null && !uriOptions.isEmpty()) {
             if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
index ef6eaba..20ad2d9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,13 +32,32 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
+@RunWith(Parameterized.class)
+public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"amqp", false},
+            {"amqp+ws", false},
+            {"amqp+ssl", true},
+            {"amqp+wss", true}
+        });
+    }
+
+    public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
+        super(connectorScheme, secure);
+    }
 
     @Rule
     public TestName testName = new TestName();
@@ -77,7 +99,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
         String payload = createLargeString(expectedSize);
         assertEquals(expectedSize, payload.getBytes().length);
 
-        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
+        Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
         long startTime = System.currentTimeMillis();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createQueue(testName.getMethodName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index b8d38e2..8956692 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -681,6 +681,12 @@ public class AmqpSession extends AmqpAbstractResource<Session>
{
     //----- Private implementation details -----------------------------------//
 
     @Override
+    protected void doOpen() {
+        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
+        super.doOpen();
+    }
+
+    @Override
     protected void doOpenInspection() {
         try {
             getStateInspector().inspectOpenedResource(getSession());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
index dc13369..e794274 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
@@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
 
-    private final int TEST_IDLE_TIMEOUT = 3000;
+    private final int TEST_IDLE_TIMEOUT = 1000;
 
     @Parameters(name="connector={0}")
     public static Collection<Object[]> data() {
@@ -165,7 +165,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport
{
         connection.connect();
 
         assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
-        assertFalse(disconnected.await(10, TimeUnit.SECONDS));
+        assertFalse(disconnected.await(5, TimeUnit.SECONDS));
 
         connection.close();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
index de47fd2..97b38fb 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
@@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
 
-    private final int TEST_IDLE_TIMEOUT = 3000;
+    private final int TEST_IDLE_TIMEOUT = 1000;
 
     @Parameters(name="connector={0}")
     public static Collection<Object[]> data() {
@@ -106,7 +106,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport
{
         connection.connect();
 
         assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
-        assertFalse(disconnected.await(10, TimeUnit.SECONDS));
+        assertFalse(disconnected.await(5, TimeUnit.SECONDS));
         assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
 
         connection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
index 11cade7..a3474a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -28,11 +28,16 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.transport.amqp.AmqpSupport;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -258,4 +263,42 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
         connection1.close();
         assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
     }
+
+    @Test(timeout = 60000)
+    public void testSimpleSendOneReceive() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        final int PAYLOAD_SIZE = 1024 * 1024;
+
+        byte[] payload = new byte[PAYLOAD_SIZE];
+        for (int i = 0; i < PAYLOAD_SIZE; i++) {
+            payload[i] = (byte) (i % PAYLOAD_SIZE);
+        }
+
+        message.setMessageId("msg" + 1);
+        message.setMessageAnnotation("serialNo", 1);
+        message.setBytes(payload);
+
+        sender.send(message);
+        sender.close();
+
+        LOG.info("Attempting to read message with receiver");
+        receiver.flow(2);
+        AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message", received);
+        assertEquals("msg1", received.getMessageId());
+        received.accept();
+
+        receiver.close();
+
+        connection.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
index c818abe..84415d7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java
@@ -43,6 +43,8 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
 
+    private final int TEST_IDLE_TIMEOUT = 500;
+
     private final String testName;
     private final int maxFrameSize;
     private final int maxAmqpFrameSize;
@@ -54,6 +56,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
                 { "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 },
                 { "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 },
                 { "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 },
+                { "amqp+ws-> MFS > MAFS", "amqp+ws", false, 1024, 2048 },
+                { "amqp+ws-> MFS < MAFS", "amqp+ws", false, 2048, 1024 },
             });
     }
 
@@ -89,12 +93,13 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
             }
         });
 
+        connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
         connection.connect();
 
         AmqpSession session = connection.createSession();
         AmqpSender sender = session.createSender("queue://" + getTestName(), true);
 
-        byte[] payload = new byte[maxFrameSize];
+        byte[] payload = new byte[maxFrameSize * 2];
         for (int i = 0; i < payload.length; ++i) {
             payload[i] = 42;
         }
@@ -104,7 +109,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
 
         sender.send(message);
 
-        assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS));
+        assertTrue("Connection should have failed", failed.await(30, TimeUnit.SECONDS));
 
         assertNotNull(getProxyToQueue(getTestName()));
         assertEquals(0, getProxyToQueue(getTestName()).getQueueSize());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
index e15f86f..9a30660 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
@@ -56,6 +56,11 @@ public interface WSTransport extends Transport {
     }
 
     /**
+     * @return the maximum frame size allowed for this WS Transport.
+     */
+    int getMaxFrameSize();
+
+    /**
      * @return the WS sub-protocol that this transport is supplying.
      */
     String getSubProtocol();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
index 0ca80ef..d5a5207 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
@@ -218,6 +218,11 @@ public final class WSTransportProxy extends TransportSupport implements
Transpor
     @Override
     public void onWebSocketConnect(Session session) {
         this.session = session;
+
+        if (wsTransport.getMaxFrameSize() > 0) {
+            this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
+            this.session.getPolicy().setMaxTextMessageSize(wsTransport.getMaxFrameSize());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 3029668..ea8867d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -145,9 +145,11 @@ public class WSTransportServer extends WebTransportServerSupport implements
Brok
 
     @Override
     public void setTransportOption(Map<String, Object> transportOptions) {
+        // String transport from options and
         Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(transportOptions,
"transport.");
         socketConnectorFactory.setTransportOptions(socketOptions);
-        super.setTransportOption(socketOptions);
+        transportOptions.putAll(socketOptions);
+        super.setTransportOption(transportOptions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc879d76/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
index 21754ad..8cb3811 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
@@ -112,7 +112,7 @@ public class WSServlet extends WebSocketServlet implements BrokerServiceAware
{
                 switch (requestedProtocol) {
                     case MQTT:
                         socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
-                        ((MQTTSocket) socket).setTransportOptions(new HashMap<String,
Object>(transportOptions));
+                        ((MQTTSocket) socket).setTransportOptions(new HashMap<>(transportOptions));
                         ((MQTTSocket) socket).setPeerCertificates(req.getCertificates());
                         resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,
req.getSubProtocols(), "mqtt"));
                         break;


Mime
View raw message