activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5066
Date Wed, 19 Feb 2014 19:12:52 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 0db7e69b4 -> 7e56f348b


https://issues.apache.org/jira/browse/AMQ-5066

apply patch after review

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7e56f348
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7e56f348
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7e56f348

Branch: refs/heads/trunk
Commit: 7e56f348bc92488700b8e4484cd9260b1de1b5d8
Parents: 0db7e69
Author: Timothy Bish <tabish121@gmai.com>
Authored: Wed Feb 19 14:12:49 2014 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Wed Feb 19 14:12:49 2014 -0500

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 35 +++++++-----
 .../transport/mqtt/MQTTSubscription.java        |  4 ++
 .../activemq/transport/mqtt/MQTTTest.java       | 56 +++++++++++++++++++-
 3 files changed, 80 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index c270566..19614a9 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -366,24 +366,33 @@ public class MQTTProtocolConverter {
     }
 
     QoS onSubscribe(Topic topic) throws MQTTProtocolException {
-        if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) {
-            ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
 
-            ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-            ConsumerInfo consumerInfo = new ConsumerInfo(id);
-            consumerInfo.setDestination(destination);
-            consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
-            consumerInfo.setDispatchAsync(true);
-            if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal()
) {
-                consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
+        if( mqttSubscriptionByTopic.containsKey(topic.name())) {
+            if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
+                // remove old subscription as the QoS has changed
+                onUnSubscribe(topic.name());
+            } else {
+                // duplicate SUBSCRIBE packet, nothing to do
+                return topic.qos();
             }
-            MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
+        }
 
-            subscriptionsByConsumerId.put(id, mqttSubscription);
-            mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
 
-            sendToActiveMQ(consumerInfo, null);
+        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setDestination(destination);
+        consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
+        consumerInfo.setDispatchAsync(true);
+        if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal()
) {
+            consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
         }
+        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
+
+        subscriptionsByConsumerId.put(id, mqttSubscription);
+        mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+
+        sendToActiveMQ(consumerInfo, null);
         return topic.qos();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index 20510ee..99af92a 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -67,4 +67,8 @@ class MQTTSubscription {
     public ConsumerInfo getConsumerInfo() {
         return consumerInfo;
     }
+
+    public QoS qos() {
+        return qos;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7e56f348/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 73397de..3e1e6cb 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -301,7 +301,7 @@ public class MQTTTest extends AbstractMQTTTest {
         initializeConnection(subscriber);
 
         String RETAINED = "retained";
-        publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true);
+        publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true);
 
         List<String> messages = new ArrayList<String>();
         for (int i = 0; i < 10; i++){
@@ -320,7 +320,7 @@ public class MQTTTest extends AbstractMQTTTest {
         for (int i =0; i < 10; i++){
             msg = subscriber.receive(5000);
             assertNotNull(msg);
-            assertEquals(messages.get(i),new String(msg));
+            assertEquals(messages.get(i), new String(msg));
         }
         subscriber.disconnect();
         publisher.disconnect();
@@ -377,6 +377,58 @@ public class MQTTTest extends AbstractMQTTTest {
 
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testDuplicateSubscriptions() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setKeepAlive((short)2);
+
+        final int[] actualQoS = {-1};
+        mqtt.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                // validate the QoS
+                if (frame.messageType() == PUBLISH.TYPE) {
+                    PUBLISH publish = new PUBLISH();
+                    try {
+                        publish.decode(frame);
+                    } catch (ProtocolException e) {
+                        fail("Failed decoding " + e.getMessage());
+                    }
+                    actualQoS[0] = publish.qos().ordinal();
+                }
+            }
+        });
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String RETAIN = "RETAIN";
+        connection.publish("TopicA", RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+
+        QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE
};
+        for (QoS qos : qoss) {
+            connection.subscribe(new Topic[]{ new Topic("TopicA", qos) });
+
+            final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            int waitCount = 0;
+            while (actualQoS[0] == -1 && waitCount < 10) {
+                Thread.sleep(1000);
+                waitCount++;
+            }
+            assertEquals(qos.ordinal(), actualQoS[0]);
+        }
+
+        connection.unsubscribe(new String[]{"TopicA"});
+        connection.disconnect();
+
+    }
+
     @Test(timeout=60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
         addMQTTConnector();


Mime
View raw message