activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5303
Date Thu, 04 Jun 2015 19:15:16 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 928e815a0 -> 47e954d0f


https://issues.apache.org/jira/browse/AMQ-5303

Fix for subscription recovery of durable topic subscriptions using
default subscription strategy and subscribing to a VirtualTopic
instance.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/47e954d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/47e954d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/47e954d0

Branch: refs/heads/master
Commit: 47e954d0f6409da418edeb4c53597e09ba03fe43
Parents: 928e815
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Jun 4 15:14:37 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Jun 4 15:14:52 2015 -0400

----------------------------------------------------------------------
 .../MQTTDefaultSubscriptionStrategy.java        |  2 +-
 .../activemq/transport/mqtt/MQTTTest.java       | 80 +++-----------------
 2 files changed, 12 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/47e954d0/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
index 1b3ac5d..68d6cb9 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -152,7 +152,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
                 QoS qoS = QoS.valueOf(split[0]);
                 onSubscribe(new Topic(split[1], qoS));
                 // mark this durable subscription as restored by Broker
-                restoredSubs.add(split[1]);
+                restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
             }
         } catch (IOException e) {
             LOG.warn("Could not restore the MQTT durable subs.", e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/47e954d0/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 360ed64..7dbf9c7 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -64,7 +64,6 @@ import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.fusesource.mqtt.codec.PUBLISH;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -510,87 +509,28 @@ public class MQTTTest extends MQTTTestSupport {
 
     @Test(timeout = 120 * 1000)
     public void testRetainedMessage() throws Exception {
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive((short) 60);
-
-        final String RETAIN = "RETAIN";
-        final String TOPICA = "TopicA";
-
-        final String[] clientIds = { null, "foo", "durable" };
-        for (String clientId : clientIds) {
-            LOG.info("Testing now with Client ID: {}", clientId);
-
-            mqtt.setClientId(clientId);
-            mqtt.setCleanSession(!"durable".equals(clientId));
-
-            BlockingConnection connection = mqtt.blockingConnection();
-            connection.connect();
-
-            // set retained message and check
-            connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
-            Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
-            assertNotNull("No retained message for " + clientId, msg);
-            assertEquals(RETAIN, new String(msg.getPayload()));
-            msg.ack();
-            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
-
-            // test duplicate subscription
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
-            msg = connection.receive(15000, TimeUnit.MILLISECONDS);
-            assertNotNull("No retained message on duplicate subscription for " + clientId,
msg);
-            assertEquals(RETAIN, new String(msg.getPayload()));
-            msg.ack();
-            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
-            connection.unsubscribe(new String[]{TOPICA});
-
-            // clear retained message and check that we don't receive it
-            connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
-            msg = connection.receive(500, TimeUnit.MILLISECONDS);
-            assertNull("Retained message not cleared for " + clientId, msg);
-            connection.unsubscribe(new String[]{TOPICA});
-
-            // set retained message again and check
-            connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
-            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
-            assertNotNull("No reset retained message for " + clientId, msg);
-            assertEquals(RETAIN, new String(msg.getPayload()));
-            msg.ack();
-            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
-
-            // re-connect and check
-            connection.disconnect();
-            connection = mqtt.blockingConnection();
-            connection.connect();
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
-            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
-            assertNotNull("No reset retained message for " + clientId, msg);
-            assertEquals(RETAIN, new String(msg.getPayload()));
-            msg.ack();
-            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
-
-            connection.unsubscribe(new String[]{TOPICA});
-            connection.disconnect();
-        }
+        doTestRetainedMessages("TopicA");
     }
 
-    @Ignore
     @Test(timeout = 120 * 1000)
     public void testRetainedMessageOnVirtualTopics() throws Exception {
+        doTestRetainedMessages("VirtualTopic/TopicA");
+    }
+
+    public void doTestRetainedMessages(String topicName) throws Exception {
         MQTT mqtt = createMQTTConnection();
         mqtt.setKeepAlive((short) 60);
 
         final String RETAIN = "RETAIN";
-        final String TOPICA = "VirtualTopic/TopicA";
+        final String TOPICA = topicName;
 
         final String[] clientIds = { null, "foo", "durable" };
         for (String clientId : clientIds) {
-            LOG.info("Testing now with Client ID: {}", clientId);
+            boolean cleanSession = !"durable".equals(clientId);
+            LOG.info("Testing now with Client ID: {} clean: {}", clientId, cleanSession);
 
             mqtt.setClientId(clientId);
-            mqtt.setCleanSession(!"durable".equals(clientId));
+            mqtt.setCleanSession(cleanSession);
 
             BlockingConnection connection = mqtt.blockingConnection();
             connection.connect();
@@ -622,6 +562,7 @@ public class MQTTTest extends MQTTTestSupport {
 
             // set retained message again and check
             connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+            LOG.info("Performing first subscription");
             connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
             msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull("No reset retained message for " + clientId, msg);
@@ -633,6 +574,7 @@ public class MQTTTest extends MQTTTestSupport {
             connection.disconnect();
             connection = mqtt.blockingConnection();
             connection.connect();
+            LOG.info("Performing second subscription");
             connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
             msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull("No reset retained message for " + clientId, msg);


Mime
View raw message