activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5781
Date Mon, 18 May 2015 21:51:20 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 7c41ebc91 -> 3454a8b59


https://issues.apache.org/jira/browse/AMQ-5781

add maxFrameSize to the transport and enforce across the TCP, SSL, NIO
and NIO+SSL transport connectors.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3454a8b5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3454a8b5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3454a8b5

Branch: refs/heads/master
Commit: 3454a8b596b58d577bb38c600d47c46978c40c82
Parents: 7c41ebc
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon May 18 17:50:55 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon May 18 17:50:55 2015 -0400

----------------------------------------------------------------------
 .../activemq/transport/mqtt/MQTTCodec.java      |  19 +++
 .../transport/mqtt/MQTTNIOSSLTransport.java     |   2 +-
 .../transport/mqtt/MQTTNIOTransport.java        |  11 +-
 .../transport/mqtt/MQTTTransportFilter.java     |  18 +++
 .../activemq/transport/mqtt/MQTTWireFormat.java |  22 +++-
 .../transport/mqtt/MQTTMaxFrameSizeTest.java    | 122 +++++++++++++++++++
 6 files changed, 189 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
index 5a78fda..12c9981 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTCodec.java
@@ -26,6 +26,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
 public class MQTTCodec {
 
     private final MQTTFrameSink frameSink;
+    private final MQTTWireFormat wireFormat;
 
     private byte header;
     private int contentLength = -1;
@@ -43,10 +44,20 @@ public class MQTTCodec {
     }
 
     public MQTTCodec(MQTTFrameSink sink) {
+        this(sink, null);
+    }
+
+    public MQTTCodec(MQTTFrameSink sink, MQTTWireFormat wireFormat) {
         this.frameSink = sink;
+        this.wireFormat = wireFormat;
     }
 
     public MQTTCodec(final TcpTransport transport) {
+        this(transport, null);
+    }
+
+    public MQTTCodec(final TcpTransport transport, MQTTWireFormat wireFormat) {
+        this.wireFormat = wireFormat;
         this.frameSink = new MQTTFrameSink() {
 
             @Override
@@ -79,6 +90,10 @@ public class MQTTCodec {
         frameSink.onFrame(frame);
     }
 
+    private int getMaxFrameSize() {
+        return wireFormat != null ? wireFormat.getMaxFrameSize() : MQTTWireFormat.MAX_MESSAGE_LENGTH;
+    }
+
     //----- Prepare the current frame parser for use -------------------------//
 
     private FrameParser initializeHeaderParser() throws IOException {
@@ -151,6 +166,10 @@ public class MQTTCodec {
                         processCommand();
                         currentParser = initializeHeaderParser();
                     } else {
+                        if (length > getMaxFrameSize()) {
+                            throw new IOException("The maximum message length was exceeded");
+                        }
+
                         currentParser = initializeContentParser();
                         contentLength = length;
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
index f982848..d82fcb5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
@@ -42,7 +42,7 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
 
     @Override
     protected void initializeStreams() throws IOException {
-        codec = new MQTTCodec(this);
+        codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
         super.initializeStreams();
         if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) {
             serviceRead();

http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
index 8b8a6c9..e750366 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
@@ -56,17 +56,20 @@ public class MQTTNIOTransport extends TcpTransport {
         super(wireFormat, socket);
     }
 
+    @Override
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
         channel.configureBlocking(false);
         // listen for events telling us when the socket is readable.
         selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener()
{
+            @Override
             public void onSelect(SelectorSelection selection) {
                 if (!isStopped()) {
                     serviceRead();
                 }
             }
 
+            @Override
             public void onError(SelectorSelection selection, Throwable error) {
                 if (error instanceof IOException) {
                     onException((IOException) error);
@@ -78,9 +81,9 @@ public class MQTTNIOTransport extends TcpTransport {
 
         inputBuffer = ByteBuffer.allocate(8 * 1024);
         NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
-        this.dataOut = new DataOutputStream(outPutStream);
-        this.buffOut = outPutStream;
-        codec = new MQTTCodec(this);
+        dataOut = new DataOutputStream(outPutStream);
+        buffOut = outPutStream;
+        codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
     }
 
     private void serviceRead() {
@@ -116,12 +119,14 @@ public class MQTTNIOTransport extends TcpTransport {
         }
     }
 
+    @Override
     protected void doStart() throws Exception {
         connect();
         selection.setInterestOps(SelectionKey.OP_READ);
         selection.enable();
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         try {
             if (selection != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 1cb6580..da84b1a 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -258,4 +258,22 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
     public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
         protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
     }
+
+    /**
+     * @return the maximum number of bytes a single MQTT message frame is allowed to be.
+     */
+    public int getMaxFrameSize() {
+        return wireFormat.getMaxFrameSize();
+    }
+
+    /**
+     * Sets the maximum frame size for an incoming MQTT frame.  The protocl limit is
+     * 256 megabytes and this value cannot be set higher.
+     *
+     * @param maxFrameSize
+     *        the maximum allowed frame size for a single MQTT frame.
+     */
+    public void setMaxFrameSize(int maxFrameSize) {
+        wireFormat.setMaxFrameSize(maxFrameSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
index 70eaec8..fe1c6aa 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
@@ -40,6 +40,8 @@ public class MQTTWireFormat implements WireFormat {
 
     private int version = 1;
 
+    private int maxFrameSize = MAX_MESSAGE_LENGTH;
+
     @Override
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -93,7 +95,7 @@ public class MQTTWireFormat implements WireFormat {
         while ((digit & 0x80) != 0);
 
         if (length >= 0) {
-            if (length > MAX_MESSAGE_LENGTH) {
+            if (length > getMaxFrameSize()) {
                 throw new IOException("The maximum message length was exceeded");
             }
 
@@ -124,4 +126,22 @@ public class MQTTWireFormat implements WireFormat {
     public int getVersion() {
         return this.version;
     }
+
+    /**
+     * @return the maximum number of bytes a single MQTT message frame is allowed to be.
+     */
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    /**
+     * Sets the maximum frame size for an incoming MQTT frame.  The protocl limit is
+     * 256 megabytes and this value cannot be set higher.
+     *
+     * @param maxFrameSize
+     *        the maximum allowed frame size for a single MQTT frame.
+     */
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = Math.min(MAX_MESSAGE_LENGTH, maxFrameSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3454a8b5/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
new file mode 100644
index 0000000..8f5ad2e
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that the maxFrameSize configuration value is applied across the transports.
+ */
+@RunWith(Parameterized.class)
+public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTMaxFrameSizeTest.class);
+
+    private final int maxFrameSize;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { "mqtt", false, 1024 },
+                { "mqtt+ssl", true, 1024 },
+                { "mqtt+nio", false, 1024 },
+                { "mqtt+nio+ssl", true, 1024 }
+            });
+    }
+
+    public MQTTMaxFrameSizeTest(String connectorScheme, boolean useSSL, int maxFrameSize)
{
+        super(connectorScheme, useSSL);
+
+        this.maxFrameSize = maxFrameSize;
+    }
+
+    @Override
+    public String getProtocolConfig() {
+        return "?transport.maxFrameSize=" + maxFrameSize;
+    }
+
+    @Test(timeout = 30000)
+    public void testFrameSizeToLargeClosesConnection() throws Exception {
+
+        LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(),
maxFrameSize);
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId(getName());
+        mqtt.setKeepAlive((short) 10);
+        mqtt.setVersion("3.1.1");
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final int payloadSize = maxFrameSize + 100;
+
+        byte[] payload = new byte[payloadSize];
+        for (int i = 0; i < payloadSize; ++i) {
+            payload[i] = 42;
+        }
+
+        try {
+            connection.publish(getTopicName(), payload, QoS.AT_LEAST_ONCE, false);
+            fail("should have thrown an exception");
+        } catch (Exception ex) {
+        } finally {
+            connection.disconnect();
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testFrameSizeNotExceededWorks() throws Exception {
+
+        LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(),
maxFrameSize);
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId(getName());
+        mqtt.setKeepAlive((short) 10);
+        mqtt.setVersion("3.1.1");
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final int payloadSize = maxFrameSize / 2;
+
+        byte[] payload = new byte[payloadSize];
+        for (int i = 0; i < payloadSize; ++i) {
+            payload[i] = 42;
+        }
+
+        try {
+            connection.publish(getTopicName(), payload, QoS.AT_LEAST_ONCE, false);
+        } catch (Exception ex) {
+            fail("should not have thrown an exception");
+        } finally {
+            connection.disconnect();
+        }
+    }
+}


Mime
View raw message