activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-826 Fix MQTT protocol detection
Date Tue, 25 Apr 2017 19:46:33 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master d0219bea1 -> 53ace34b4


ARTEMIS-826 Fix MQTT protocol detection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1c84bd39
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1c84bd39
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1c84bd39

Branch: refs/heads/master
Commit: 1c84bd39c45a8f3cf91d7d60d212d693905c9ad8
Parents: d0219be
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Apr 24 16:27:46 2017 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Apr 25 13:53:21 2017 +0100

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolManager.java | 48 +++++++++++++++-----
 .../artemis/core/protocol/ProtocolHandler.java  |  3 +-
 .../integration/mqtt/imported/MQTTTest.java     | 21 +++++++++
 3 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index f4cba64..6118b0d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.mqtt.MqttDecoder;
 import io.netty.handler.codec.mqtt.MqttEncoder;
@@ -115,19 +117,43 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,
MQTTInter
       pipeline.addLast(new MQTTProtocolHandler(server, this));
    }
 
+   /**
+    * The protocol handler passes us an 8 byte long array from the transport.  We sniff these
first 8 bytes to see
+    * if they match the first 8 bytes from MQTT Connect packet.  In many other protocols
the protocol name is the first
+    * thing sent on the wire.  However, in MQTT the protocol name doesn't come until later
on in the CONNECT packet.
+    *
+    * In order to fully identify MQTT protocol via protocol name, we need up to 12 bytes.
 However, we can use other
+    * information from the connect packet to infer that the MQTT protocol is being used.
 This is enough to identify MQTT
+    * and add the Netty codec in the pipeline.  The Netty codec takes care of things from
here.
+    *
+    * MQTT CONNECT PACKET: See MQTT 3.1.1 Spec for more info.
+    *
+    * Byte 1: Fixed Header Packet Type.  0b0001000 (16) = MQTT Connect
+    * Byte 2-[N]: Remaining length of the Connect Packet (encoded with 1-4 bytes).
+    *
+    * The next set of bytes represents the UTF8 encoded string MQTT (MQTT 3.1.1) or MQIsdp
(MQTT 3.1)
+    * Byte N: UTF8 MSB must be 0
+    * Byte N+1: UTF8 LSB must be (4(MQTT) or 6(MQIsdp))
+    * Byte N+1: M (first char from the protocol name).
+    *
+    * Max no bytes used in the sequence = 8.
+    */
    @Override
    public boolean isProtocol(byte[] array) {
-      boolean mqtt311 = array[4] == 77 && // M
-         array[5] == 81 && // Q
-         array[6] == 84 && // T
-         array[7] == 84;   // T
-
-      // FIXME The actual protocol name is 'MQIsdp' (However we are only passed the first
4 bytes of the protocol name)
-      boolean mqtt31 = array[4] == 77 && // M
-         array[5] == 81 && // Q
-         array[6] == 73 && // I
-         array[7] == 115;   // s
-      return mqtt311 || mqtt31;
+      ByteBuf buf = Unpooled.wrappedBuffer(array);
+
+      if (!(buf.readByte() == 16 && validateRemainingLength(buf) && buf.readByte()
== (byte) 0)) return false;
+      byte b = buf.readByte();
+      return ((b == 4 || b == 6) && (buf.readByte() == 77));
+   }
+
+   private boolean validateRemainingLength(ByteBuf buffer) {
+      byte msb = (byte) 0b10000000;
+      for (byte i = 0; i < 4; i++) {
+         if ((buffer.readByte() & msb) != msb)
+            return true;
+      }
+      return false;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index ca78f29..327d870 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -135,7 +135,7 @@ public class ProtocolHandler {
             return;
          }
 
-         // Will use the first five bytes to detect a protocol.
+         // Will use the first N bytes to detect a protocol depending on the protocol.
          if (in.readableBytes() < 8) {
             return;
          }
@@ -175,6 +175,7 @@ public class ProtocolHandler {
                protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
             }
          }
+
          ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
          ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator();
          ChannelPipeline pipeline = ctx.pipeline();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 6b58fa2..28b7984 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -83,6 +83,27 @@ public class MQTTTest extends MQTTTestSupport {
 
    }
 
+   @Test
+   public void testConnectWithLargePassword() throws Exception {
+      for (String version : Arrays.asList("3.1", "3.1.1")) {
+         String longString = new String(new char[65535]);
+
+         BlockingConnection connection = null;
+         try {
+            MQTT mqtt = createMQTTConnection("test-" + version, true);
+            mqtt.setUserName(longString);
+            mqtt.setPassword(longString);
+            mqtt.setConnectAttemptsMax(1);
+            mqtt.setVersion(version);
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            assertTrue(connection.isConnected());
+         } finally {
+            if (connection != null && connection.isConnected()) connection.disconnect();
+         }
+      }
+   }
+
    @Test(timeout = 60 * 1000)
    public void testSendAndReceiveMQTT() throws Exception {
       final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();


Mime
View raw message