activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject git commit: AMQ-5101 - MQTT SUBACK packet MUST use return code 0x80 to report failed topic subscriptions
Date Fri, 14 Mar 2014 13:10:41 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 167152307 -> ac23b0174


AMQ-5101 - MQTT SUBACK packet MUST use return code 0x80 to report failed topic subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ac23b017
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ac23b017
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ac23b017

Branch: refs/heads/trunk
Commit: ac23b017497a297c73a0e8ce11980cad3e16c546
Parents: 1671523
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Fri Mar 14 14:10:12 2014 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Fri Mar 14 14:10:12 2014 +0100

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 35 +++++++++---
 .../activemq/transport/mqtt/MQTTTest.java       | 58 +++++++++++++++++++-
 2 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ac23b017/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 1f912b7..91f81ab 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -52,6 +52,7 @@ public class MQTTProtocolConverter {
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
     private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
     private static final int DEFAULT_CACHE_SIZE = 5000;
+    private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
@@ -332,7 +333,7 @@ public class MQTTProtocolConverter {
         if (topics != null) {
             byte[] qos = new byte[topics.length];
             for (int i = 0; i < topics.length; i++) {
-                qos[i] = (byte) onSubscribe(topics[i]).ordinal();
+                qos[i] = onSubscribe(topics[i]);
             }
             SUBACK ack = new SUBACK();
             ack.messageId(command.messageId());
@@ -344,6 +345,10 @@ public class MQTTProtocolConverter {
             }
             // check retained messages
             for (int i = 0; i < topics.length; i++) {
+                if (qos[i] == SUBSCRIBE_ERROR) {
+                    // skip this topic if subscribe failed
+                    continue;
+                }
                 final Topic topic = topics[i];
                 ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
                 for (PUBLISH msg : retainedMessages.getMessages(destination)) {
@@ -374,7 +379,7 @@ public class MQTTProtocolConverter {
 
     }
 
-    QoS onSubscribe(Topic topic) throws MQTTProtocolException {
+    byte onSubscribe(final Topic topic) throws MQTTProtocolException {
 
         if( mqttSubscriptionByTopic.containsKey(topic.name())) {
             if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
@@ -382,7 +387,7 @@ public class MQTTProtocolConverter {
                 onUnSubscribe(topic.name());
             } else {
                 // duplicate SUBSCRIBE packet, nothing to do
-                return topic.qos();
+                return (byte) topic.qos().ordinal();
             }
         }
 
@@ -398,11 +403,27 @@ public class MQTTProtocolConverter {
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 
-        subscriptionsByConsumerId.put(id, mqttSubscription);
-        mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+        final byte[] qos = {-1};
+        sendToActiveMQ(consumerInfo, new ResponseHandler() {
+            @Override
+            public void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException {
+                // validate subscription request
+                if (response.isException()) {
+                    final Throwable throwable = ((ExceptionResponse) response).getException();
+                    LOG.debug("Error subscribing to " + topic.name(), throwable);
+                    qos[0] = SUBSCRIBE_ERROR;
+                } else {
+                    qos[0] = (byte) topic.qos().ordinal();
+                }
+            }
+        });
+
+        if (qos[0] != SUBSCRIBE_ERROR) {
+            subscriptionsByConsumerId.put(id, mqttSubscription);
+            mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+        }
 
-        sendToActiveMQ(consumerInfo, null);
-        return topic.qos();
+        return qos[0];
     }
 
     void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac23b017/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 76d1597..f44dfd4 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -34,8 +34,15 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotEquals;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.jaas.GroupPrincipal;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
 import org.fusesource.hawtbuf.Buffer;
@@ -516,6 +523,55 @@ public class MQTTTest extends AbstractMQTTTest {
     }
 
     @Test(timeout = 60 * 1000)
+    public void testFailedSubscription() throws Exception {
+        addMQTTConnector();
+
+        final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin();
+        authenticationPlugin.setAnonymousAccessAllowed(true);
+
+        final String ANONYMOUS = "anonymous";
+        authenticationPlugin.setAnonymousGroup(ANONYMOUS);
+        final DefaultAuthorizationMap map = new DefaultAuthorizationMap();
+        // only one authorized destination, anonymous for anonymous group!
+        map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS));
+        final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map,
map, map));
+
+        brokerService.setPlugins(new BrokerPlugin[] {authorizationPlugin, authenticationPlugin});
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setKeepAlive((short) 2);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String NAMED = "named";
+        byte[] qos = connection.subscribe(new Topic[] {
+            new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE)});
+        assertEquals((byte)0x80, qos[0]);
+        assertEquals((byte)QoS.EXACTLY_ONCE.ordinal(), qos[1]);
+
+        // validate the subscription by sending a retained message
+        connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
+        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(ANONYMOUS, new String(msg.getPayload()));
+        msg.ack();
+
+        connection.unsubscribe(new String[]{ANONYMOUS});
+        qos = connection.subscribe(new Topic[]{new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE)});
+        assertEquals((byte)QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
+
+        msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(ANONYMOUS, new String(msg.getPayload()));
+        msg.ack();
+
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
     public void testUniqueMessageIds() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -612,7 +668,7 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.disconnect();
     }
 
-    @Test(timeout = 600 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testResendMessageId() throws Exception {
         addMQTTConnector();
         brokerService.start();


Mime
View raw message