activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5303
Date Fri, 05 Jun 2015 22:05:17 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 3649f13b8 -> af999fe2b


https://issues.apache.org/jira/browse/AMQ-5303

Resolves issues with the virtual topic subscription strategy especially
when subscribing durably to the Topic portion of a virtual destination.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/af999fe2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/af999fe2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/af999fe2

Branch: refs/heads/master
Commit: af999fe2b2f5140fd7fcae187b491dddbcde1fe9
Parents: 3649f13
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jun 5 18:05:03 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jun 5 18:05:03 2015 -0400

----------------------------------------------------------------------
 .../AbstractMQTTSubscriptionStrategy.java       |  74 +++++++++++++
 .../MQTTDefaultSubscriptionStrategy.java        |  81 +-------------
 .../MQTTVirtualTopicSubscriptionStrategy.java   |  65 +++++++++--
 .../mqtt/MQTTVirtualTopicSubscriptionsTest.java | 110 +++++++++++++++++--
 4 files changed, 231 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/af999fe2/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
index bb84192..f3bf94e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -17,6 +17,10 @@
 package org.apache.activemq.transport.mqtt.strategy;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -24,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.PrefetchSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
@@ -35,9 +40,12 @@ import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
 import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
 import org.apache.activemq.transport.mqtt.MQTTSubscription;
 import org.apache.activemq.transport.mqtt.ResponseHandler;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -61,6 +69,7 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
 
     protected final ConcurrentMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
     protected final ConcurrentMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
+    protected final Set<String> restoredDurableSubs = Collections.synchronizedSet(new
HashSet<String>());
 
     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
 
@@ -242,4 +251,69 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
             });
         }
     }
+
+    //----- Durable Subscription management methods --------------------------//
+
+    protected void deleteDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            for (SubscriptionInfo sub : subs) {
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(protocol.getConnectionId());
+                rsi.setSubscriptionName(sub.getSubcriptionName());
+                rsi.setClientId(sub.getClientId());
+                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                        // ignore failures..
+                    }
+                });
+            }
+        } catch (Throwable e) {
+            LOG.warn("Could not delete the MQTT durable subs.", e);
+        }
+    }
+
+    protected void restoreDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            for (SubscriptionInfo sub : subs) {
+                String name = sub.getSubcriptionName();
+                String[] split = name.split(":", 2);
+                QoS qoS = QoS.valueOf(split[0]);
+                onSubscribe(new Topic(split[1], qoS));
+                // mark this durable subscription as restored by Broker
+                restoredDurableSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
+            }
+        } catch (IOException e) {
+            LOG.warn("Could not restore the MQTT durable subs.", e);
+        }
+    }
+
+    protected List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException
{
+        List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>();
+        RegionBroker regionBroker;
+
+        try {
+            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+        } catch (Exception e) {
+            throw new MQTTProtocolException("Error recovering durable subscriptions: " +
e.getMessage(), false, e);
+        }
+
+        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+        List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId);
+        if (subscriptions != null) {
+            for (DurableTopicSubscription subscription : subscriptions) {
+                LOG.debug("Recovered durable sub:{} on connect", subscription);
+
+                SubscriptionInfo info = new SubscriptionInfo();
+
+                info.setDestination(subscription.getActiveMQDestination());
+                info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName());
+                info.setClientId(clientId);
+
+                result.add(info);
+            }
+        }
+
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/af999fe2/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
index 68d6cb9..63b0e85 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -17,16 +17,9 @@
 package org.apache.activemq.transport.mqtt.strategy;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
@@ -39,20 +32,13 @@ import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
 import org.apache.activemq.transport.mqtt.MQTTSubscription;
 import org.apache.activemq.transport.mqtt.ResponseHandler;
 import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.codec.CONNECT;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation that uses unmapped topic subscriptions.
  */
 public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
 
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
-
-    private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
-
     @Override
     public void onConnect(CONNECT connect) throws MQTTProtocolException {
         List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
@@ -93,7 +79,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
 
         // check whether the Topic has been recovered in restoreDurableSubs
         // mark subscription available for recovery for duplicate subscription
-        if (restoredSubs.remove(destination.getPhysicalName())) {
+        if (restoredDurableSubs.remove(destination.getPhysicalName())) {
             return;
         }
 
@@ -109,7 +95,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
             // check if the durable sub also needs to be removed
             if (subscription.getConsumerInfo().getSubscriptionName() != null) {
                 // also remove it from restored durable subscriptions set
-                restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
+                restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
 
                 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
                 rsi.setConnectionId(protocol.getConnectionId());
@@ -124,67 +110,4 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
             }
         }
     }
-
-    private void deleteDurableSubs(List<SubscriptionInfo> subs) {
-        try {
-            for (SubscriptionInfo sub : subs) {
-                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                rsi.setConnectionId(protocol.getConnectionId());
-                rsi.setSubscriptionName(sub.getSubcriptionName());
-                rsi.setClientId(sub.getClientId());
-                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
-                    @Override
-                    public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
-                        // ignore failures..
-                    }
-                });
-            }
-        } catch (Throwable e) {
-            LOG.warn("Could not delete the MQTT durable subs.", e);
-        }
-    }
-
-    private void restoreDurableSubs(List<SubscriptionInfo> subs) {
-        try {
-            for (SubscriptionInfo sub : subs) {
-                String name = sub.getSubcriptionName();
-                String[] split = name.split(":", 2);
-                QoS qoS = QoS.valueOf(split[0]);
-                onSubscribe(new Topic(split[1], qoS));
-                // mark this durable subscription as restored by Broker
-                restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
-            }
-        } catch (IOException e) {
-            LOG.warn("Could not restore the MQTT durable subs.", e);
-        }
-    }
-
-    List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException
{
-        List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>();
-        RegionBroker regionBroker;
-
-        try {
-            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
-        } catch (Exception e) {
-            throw new MQTTProtocolException("Error recovering durable subscriptions: " +
e.getMessage(), false, e);
-        }
-
-        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
-        List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId);
-        if (subscriptions != null) {
-            for (DurableTopicSubscription subscription : subscriptions) {
-                LOG.debug("Recovered durable sub:{} on connect", subscription);
-
-                SubscriptionInfo info = new SubscriptionInfo();
-
-                info.setDestination(subscription.getActiveMQDestination());
-                info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName());
-                info.setClientId(clientId);
-
-                result.add(info);
-            }
-        }
-
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/af999fe2/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index d0735e1..468e823 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -35,9 +35,12 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
 import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
 import org.apache.activemq.transport.mqtt.MQTTSubscription;
 import org.apache.activemq.transport.mqtt.ResponseHandler;
 import org.fusesource.mqtt.client.QoS;
@@ -62,35 +65,53 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
     @Override
     public void onConnect(CONNECT connect) throws MQTTProtocolException {
         List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId());
+        List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
+
+        // When clean session is true we must purge all of the client's old Queue subscriptions
+        // and any durable subscriptions created on the VirtualTopic instance as well.
 
         if (connect.cleanSession()) {
             deleteDurableQueues(queues);
+            deleteDurableSubs(subs);
         } else {
             restoreDurableQueue(queues);
+            restoreDurableSubs(subs);
         }
     }
 
     @Override
     public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException
{
         ActiveMQDestination destination = null;
+        int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
         ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
+
         if (!protocol.isCleanSession() && protocol.getClientId() != null &&
requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
-            String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":"
+ requestedQoS + "." +
-                               VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
-            destination = new ActiveMQQueue(converted);
-            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
+            String converted = convertMQTTToActiveMQ(topicName);
+            if (converted.startsWith(VIRTUALTOPIC_PREFIX)) {
+                destination = new ActiveMQTopic(converted);
+                prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+                consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
+            } else {
+                converted = VIRTUALTOPIC_CONSUMER_PREFIX +
+                            protocol.getClientId() + ":" + requestedQoS + "." +
+                            VIRTUALTOPIC_PREFIX + converted;
+                destination = new ActiveMQQueue(converted);
+                prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
+            }
         } else {
             String converted = convertMQTTToActiveMQ(topicName);
             if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
-                converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
+                converted = VIRTUALTOPIC_PREFIX + converted;
             }
             destination = new ActiveMQTopic(converted);
-            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
+            prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
         }
 
         consumerInfo.setDestination(destination);
         if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
             consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        } else {
+            consumerInfo.setPrefetchSize(prefetch);
         }
         consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
@@ -103,9 +124,15 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
 
         ActiveMQDestination destination = mqttSubscription.getDestination();
 
+        // check whether the Queue has been recovered in restoreDurableQueue
+        // mark subscription available for recovery for duplicate subscription
+        if (destination.isQueue() && restoredQueues.remove(destination)) {
+            return;
+        }
+
         // check whether the Topic has been recovered in restoreDurableSubs
         // mark subscription available for recovery for duplicate subscription
-        if (restoredQueues.remove(destination)) {
+        if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName()))
{
             return;
         }
 
@@ -136,6 +163,20 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
                         // ignore failures..
                     }
                 });
+            } else if (subscription.getConsumerInfo().getSubscriptionName() != null) {
+                // also remove it from restored durable subscriptions set
+                restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
+
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(protocol.getConnectionId());
+                rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
+                rsi.setClientId(protocol.getClientId());
+                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                        // ignore failures..
+                    }
+                });
             }
         }
     }
@@ -154,7 +195,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
         String destinationName = destination.getPhysicalName();
         int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX);
         if (position >= 0) {
-            destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0);
+            destinationName = destinationName.substring(position + VIRTUALTOPIC_PREFIX.length()).substring(0);
         }
         return destinationName;
     }
@@ -171,7 +212,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
     private void deleteDurableQueues(List<ActiveMQQueue> queues) {
         try {
             for (ActiveMQQueue queue : queues) {
-                LOG.debug("Removing subscription for {} ",queue.getPhysicalName());
+                LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName());
                 DestinationInfo removeAction = new DestinationInfo();
                 removeAction.setConnectionId(protocol.getConnectionId());
                 removeAction.setDestination(queue);
@@ -185,7 +226,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
                 });
             }
         } catch (Throwable e) {
-            LOG.warn("Could not delete the MQTT durable subs.", e);
+            LOG.warn("Could not delete the MQTT queue subsscriptions.", e);
         }
     }
 
@@ -199,7 +240,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
                 tokenizer.nextToken();
                 String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
                 QoS qoS = QoS.valueOf(qosString);
-                LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
+                LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS);
 
                 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
                 consumerInfo.setDestination(queue);
@@ -216,7 +257,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
                 restoredQueues.add(queue);
             }
         } catch (IOException e) {
-            LOG.warn("Could not restore the MQTT durable subs.", e);
+            LOG.warn("Could not restore the MQTT queue subscriptions.", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/af999fe2/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
index b4f985c..052a7ee 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -16,8 +16,14 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import static org.junit.Assert.assertTrue;
+
+import org.apache.activemq.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -32,13 +38,6 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
         super.setUp();
     }
 
-    // TODO - This currently fails on the durable case because we have a hard time
-    //        recovering the original Topic name when a client tries to subscribe
-    //        durable to a VirtualTopic.* type topic.
-    @Override
-    @Ignore
-    public void testRetainedMessageOnVirtualTopics() throws Exception {}
-
     @Override
     @Test(timeout = 60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
@@ -56,4 +55,99 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
     public void testJmsMapping() throws Exception {
         doTestJmsMapping("VirtualTopic.test.foo");
     }
+
+    @Test(timeout = 60 * 1000)
+    public void testSubscribeOnVirtualTopicAsDurable() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("VirtualTopicSubscriber");
+        mqtt.setKeepAlive((short) 2);
+        mqtt.setCleanSession(false);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String topicName = "VirtualTopic/foo/bah";
+
+        connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
+
+        assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
1;
+            }
+        }));
+
+        connection.unsubscribe(new String[] { topicName });
+
+        assertTrue("Should remove a durable subscription", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getDurableTopicSubscribers().length ==
0;
+            }
+        }));
+
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDurableVirtaulTopicSubIsRecovered() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("VirtualTopicSubscriber");
+        mqtt.setKeepAlive((short) 2);
+        mqtt.setCleanSession(false);
+
+        final String topicName = "VirtualTopic/foo/bah";
+
+        {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
+
+            assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition()
{
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return brokerService.getAdminView().getDurableTopicSubscribers().length
== 1;
+                }
+            }));
+
+            connection.disconnect();
+        }
+
+        assertTrue("Should be one inactive subscription", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length
== 1;
+            }
+        }));
+
+        {
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            assertTrue("Should recover a durable subscription", Wait.waitFor(new Wait.Condition()
{
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return brokerService.getAdminView().getDurableTopicSubscribers().length
== 1;
+                }
+            }));
+
+            connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
+
+            assertTrue("Should still be just one durable subscription", Wait.waitFor(new
Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return brokerService.getAdminView().getDurableTopicSubscribers().length
== 1;
+                }
+            }));
+
+            connection.disconnect();
+        }
+    }
 }


Mime
View raw message